Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some refactor of experimental topic helpers #1475

Merged
merged 1 commit into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* refactored experimental topic iterators in topicsugar package

## v3.80.9
* Fixed bug in experimental api: `ydb.ParamsBuilder().Param().Optional()` receive pointer and really produce optional value.

Expand Down
55 changes: 55 additions & 0 deletions examples/topic/topicreader/topicreader_simple_iterators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//go:build go1.23

package topicreaderexamples

import (
"context"
"fmt"

"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
firestore "google.golang.org/genproto/firestore/bundle"
)

// IterateOverMessagesAsString is simple example for easy start read messages
// it is not recommend way for heavy-load processing, batch processing usually will faster
func IterateOverMessagesAsString(ctx context.Context, reader *topicreader.Reader) error {
for msg, err := range topicsugar.StringIterator(ctx, reader) {
if err != nil {
return err
}
fmt.Println(msg.Data)
_ = reader.Commit(msg.Context(), msg)
}

return nil
}

// IterateOverStructUnmarshalledFromJSON is example for effective way for unmarshal json message content to value
func IterateOverStructUnmarshalledFromJSON(ctx context.Context, r *topicreader.Reader) error {
//nolint:tagliatelle
type S struct {
MyField int `json:"my_field"`
}

for msg, err := range topicsugar.JSONIterator[S](ctx, r) {
if err != nil {
return err
}
fmt.Println(msg.Data.MyField)
}

return nil
}

// IterateOverProtobufMessages is example for effective way for unmarshal protobuf message content to value
func IterateOverProtobufMessages(ctx context.Context, r *topicreader.Reader) error {
for msg, err := range topicsugar.ProtobufIterator[*firestore.BundledDocumentMetadata](ctx, r) {
if err != nil {
return err
}
fmt.Println(msg.Data.GetName())
}

return nil
}
25 changes: 24 additions & 1 deletion tests/integration/topic_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ func TestMessageReaderIterator(t *testing.T) {
require.Equal(t, []string{"asd", "ddd", "ggg"}, results)
}

func TestStringReaderIterator(t *testing.T) {
scope := newScope(t)
ctx := scope.Ctx

err := scope.TopicWriter().Write(ctx,
topicwriter.Message{Data: strings.NewReader("asd")},
topicwriter.Message{Data: strings.NewReader("ddd")},
topicwriter.Message{Data: strings.NewReader("ggg")},
)
require.NoError(t, err)

var results []string
for mess, err := range topicsugar.StringIterator(ctx, scope.TopicReader()) {
require.NoError(t, err)

results = append(results, mess.Data)
if len(results) == 3 {
break
}
}
require.Equal(t, []string{"asd", "ddd", "ggg"}, results)
}

func TestMessageJsonUnmarshalIterator(t *testing.T) {
scope := newScope(t)
ctx := scope.Ctx
Expand All @@ -71,7 +94,7 @@ func TestMessageJsonUnmarshalIterator(t *testing.T) {
var results []testStruct
expectedSeqno := int64(1)
expectedOffset := int64(0)
for mess, err := range topicsugar.TopicUnmarshalJSONIterator[testStruct](ctx, scope.TopicReader()) {
for mess, err := range topicsugar.JSONIterator[testStruct](ctx, scope.TopicReader()) {
require.NoError(t, err)
require.Equal(t, expectedSeqno, mess.SeqNo)
require.Equal(t, expectedOffset, mess.Offset)
Expand Down
2 changes: 1 addition & 1 deletion topic/topicsugar/cdc-reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ func UnmarshalCDCStream[T YDBCDCItem[K], K any](
return json.Unmarshal(data, dst)
}

return TopicUnmarshalJSONFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal)
return IteratorFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal)
}
62 changes: 55 additions & 7 deletions topic/topicsugar/iterators.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
//go:build go1.23

package topicsugar

import (
"context"
"encoding/json"
"slices"

"google.golang.org/protobuf/proto"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

// MessageReader is interface for topicreader.Message
// TopicMessageReader is interface for topicreader.Message
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type TopicMessageReader interface {
ReadMessage(ctx context.Context) (*topicreader.Message, error)
}

// TopicMessagesIterator is typed representation of cdc event
// TopicMessageIterator iterator wrapper over topic reader
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[*topicreader.Message, error] {
Expand All @@ -33,24 +38,67 @@ func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[
}
}

// TopicUnmarshalJSONIterator is typed representation of cdc event
// BytesIterator produce iterator over topic messages with Data as []byte, []byte is content of the message
func BytesIterator(
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[[]byte], error] {
var unmarshalFunc TypedUnmarshalFunc[*[]byte] = func(data []byte, dst *[]byte) error {
*dst = slices.Clone(data)

return nil
}

return IteratorFunc[[]byte](ctx, r, unmarshalFunc)
}

// StringIterator produce iterator over topic messages with Data is string, created from message content
func StringIterator(
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[string], error] {
var unmarshalFunc TypedUnmarshalFunc[*string] = func(data []byte, dst *string) error {
*dst = string(data)

return nil
}

return IteratorFunc[string](ctx, r, unmarshalFunc)
}

// JSONIterator produce iterator over topic messages with Data is T, created unmarshalled from message
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicUnmarshalJSONIterator[T any](
func JSONIterator[T any](
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[T], error] {
var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error {
return json.Unmarshal(data, dst)
}

return TopicUnmarshalJSONFunc[T](ctx, r, unmarshalFunc)
return IteratorFunc[T](ctx, r, unmarshalFunc)
}

// ProtobufIterator produce iterator over topic messages with Data is T, created unmarshalled from message
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func ProtobufIterator[T proto.Message](
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[T], error] {
var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error {
return proto.Unmarshal(data, *dst)
}

return IteratorFunc[T](ctx, r, unmarshalFunc)
}

// TopicUnmarshalJSONIterator is typed representation of cdc event
// IteratorFunc produce iterator over topic messages with Data is T,
// created unmarshalled from message by custom function
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicUnmarshalJSONFunc[T any](
func IteratorFunc[T any](
ctx context.Context,
r TopicMessageReader,
f TypedUnmarshalFunc[*T],
Expand Down
Loading