Skip to content

Commit

Permalink
Merge pull request #1475 some refactor of experimental topic helpers
Browse files Browse the repository at this point in the history
  • Loading branch information
rekby authored Sep 18, 2024
2 parents 4d75f71 + eb6cb0c commit f4c4c8a
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* refactored experimental topic iterators in topicsugar package

## v3.80.9
* Fixed bug in experimental api: `ydb.ParamsBuilder().Param().Optional()` receive pointer and really produce optional value.

Expand Down
55 changes: 55 additions & 0 deletions examples/topic/topicreader/topicreader_simple_iterators.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
//go:build go1.23

package topicreaderexamples

import (
"context"
"fmt"

"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicsugar"
firestore "google.golang.org/genproto/firestore/bundle"
)

// IterateOverMessagesAsString is simple example for easy start read messages
// it is not recommend way for heavy-load processing, batch processing usually will faster
func IterateOverMessagesAsString(ctx context.Context, reader *topicreader.Reader) error {
for msg, err := range topicsugar.StringIterator(ctx, reader) {
if err != nil {
return err
}
fmt.Println(msg.Data)
_ = reader.Commit(msg.Context(), msg)
}

return nil
}

// IterateOverStructUnmarshalledFromJSON is example for effective way for unmarshal json message content to value
func IterateOverStructUnmarshalledFromJSON(ctx context.Context, r *topicreader.Reader) error {
//nolint:tagliatelle
type S struct {
MyField int `json:"my_field"`
}

for msg, err := range topicsugar.JSONIterator[S](ctx, r) {
if err != nil {
return err
}
fmt.Println(msg.Data.MyField)
}

return nil
}

// IterateOverProtobufMessages is example for effective way for unmarshal protobuf message content to value
func IterateOverProtobufMessages(ctx context.Context, r *topicreader.Reader) error {
for msg, err := range topicsugar.ProtobufIterator[*firestore.BundledDocumentMetadata](ctx, r) {
if err != nil {
return err
}
fmt.Println(msg.Data.GetName())
}

return nil
}
25 changes: 24 additions & 1 deletion tests/integration/topic_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@ func TestMessageReaderIterator(t *testing.T) {
require.Equal(t, []string{"asd", "ddd", "ggg"}, results)
}

func TestStringReaderIterator(t *testing.T) {
scope := newScope(t)
ctx := scope.Ctx

err := scope.TopicWriter().Write(ctx,
topicwriter.Message{Data: strings.NewReader("asd")},
topicwriter.Message{Data: strings.NewReader("ddd")},
topicwriter.Message{Data: strings.NewReader("ggg")},
)
require.NoError(t, err)

var results []string
for mess, err := range topicsugar.StringIterator(ctx, scope.TopicReader()) {
require.NoError(t, err)

results = append(results, mess.Data)
if len(results) == 3 {
break
}
}
require.Equal(t, []string{"asd", "ddd", "ggg"}, results)
}

func TestMessageJsonUnmarshalIterator(t *testing.T) {
scope := newScope(t)
ctx := scope.Ctx
Expand All @@ -71,7 +94,7 @@ func TestMessageJsonUnmarshalIterator(t *testing.T) {
var results []testStruct
expectedSeqno := int64(1)
expectedOffset := int64(0)
for mess, err := range topicsugar.TopicUnmarshalJSONIterator[testStruct](ctx, scope.TopicReader()) {
for mess, err := range topicsugar.JSONIterator[testStruct](ctx, scope.TopicReader()) {
require.NoError(t, err)
require.Equal(t, expectedSeqno, mess.SeqNo)
require.Equal(t, expectedOffset, mess.Offset)
Expand Down
2 changes: 1 addition & 1 deletion topic/topicsugar/cdc-reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,5 +87,5 @@ func UnmarshalCDCStream[T YDBCDCItem[K], K any](
return json.Unmarshal(data, dst)
}

return TopicUnmarshalJSONFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal)
return IteratorFunc[YDBCDCMessage[T, K]](ctx, reader, unmarshal)
}
62 changes: 55 additions & 7 deletions topic/topicsugar/iterators.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
//go:build go1.23

package topicsugar

import (
"context"
"encoding/json"
"slices"

"google.golang.org/protobuf/proto"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/xiter"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicreader"
)

// MessageReader is interface for topicreader.Message
// TopicMessageReader is interface for topicreader.Message
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
type TopicMessageReader interface {
ReadMessage(ctx context.Context) (*topicreader.Message, error)
}

// TopicMessagesIterator is typed representation of cdc event
// TopicMessageIterator iterator wrapper over topic reader
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[*topicreader.Message, error] {
Expand All @@ -33,24 +38,67 @@ func TopicMessageIterator(ctx context.Context, r TopicMessageReader) xiter.Seq2[
}
}

// TopicUnmarshalJSONIterator is typed representation of cdc event
// BytesIterator produce iterator over topic messages with Data as []byte, []byte is content of the message
func BytesIterator(
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[[]byte], error] {
var unmarshalFunc TypedUnmarshalFunc[*[]byte] = func(data []byte, dst *[]byte) error {
*dst = slices.Clone(data)

return nil
}

return IteratorFunc[[]byte](ctx, r, unmarshalFunc)
}

// StringIterator produce iterator over topic messages with Data is string, created from message content
func StringIterator(
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[string], error] {
var unmarshalFunc TypedUnmarshalFunc[*string] = func(data []byte, dst *string) error {
*dst = string(data)

return nil
}

return IteratorFunc[string](ctx, r, unmarshalFunc)
}

// JSONIterator produce iterator over topic messages with Data is T, created unmarshalled from message
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicUnmarshalJSONIterator[T any](
func JSONIterator[T any](
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[T], error] {
var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error {
return json.Unmarshal(data, dst)
}

return TopicUnmarshalJSONFunc[T](ctx, r, unmarshalFunc)
return IteratorFunc[T](ctx, r, unmarshalFunc)
}

// ProtobufIterator produce iterator over topic messages with Data is T, created unmarshalled from message
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func ProtobufIterator[T proto.Message](
ctx context.Context,
r TopicMessageReader,
) xiter.Seq2[*TypedTopicMessage[T], error] {
var unmarshalFunc TypedUnmarshalFunc[*T] = func(data []byte, dst *T) error {
return proto.Unmarshal(data, *dst)
}

return IteratorFunc[T](ctx, r, unmarshalFunc)
}

// TopicUnmarshalJSONIterator is typed representation of cdc event
// IteratorFunc produce iterator over topic messages with Data is T,
// created unmarshalled from message by custom function
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
func TopicUnmarshalJSONFunc[T any](
func IteratorFunc[T any](
ctx context.Context,
r TopicMessageReader,
f TypedUnmarshalFunc[*T],
Expand Down

0 comments on commit f4c4c8a

Please sign in to comment.