Skip to content

Commit

Permalink
Merge pull request #1238 read topic without consumer, fix send partit…
Browse files Browse the repository at this point in the history
…ion number for topic reader
  • Loading branch information
rekby authored May 21, 2024
2 parents a9ac5e1 + 58ac44d commit c98862c
Show file tree
Hide file tree
Showing 9 changed files with 140 additions and 5 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
* Simple implement option WithReaderWithoutConsumer
* Fixed bug: topic didn't send specified partition number to a server

## v3.67.2
* Fixed incorrect formatting of decimal. Implementation of decimal has been reverted to latest working version

Expand Down
5 changes: 5 additions & 0 deletions internal/grpcwrapper/rawtopic/rawtopicreader/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func (r *InitRequest) toProto() *Ydb_Topic.StreamReadMessage_InitRequest {
dstTopicSettings.Path = srcTopicSettings.Path
dstTopicSettings.MaxLag = srcTopicSettings.MaxLag.ToProto()
dstTopicSettings.ReadFrom = srcTopicSettings.ReadFrom.ToProto()

partitionsIDs := make([]int64, len(srcTopicSettings.PartitionsID))
copy(partitionsIDs, srcTopicSettings.PartitionsID)

dstTopicSettings.PartitionIds = partitionsIDs
}

return p
Expand Down
7 changes: 5 additions & 2 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,11 @@ func (c *Client) StartReader(
}
opts = append(defaultOpts, opts...)

internalReader := topicreaderinternal.NewReader(connector, consumer, readSelectors, opts...)
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer)
internalReader, err := topicreaderinternal.NewReader(connector, consumer, readSelectors, opts...)
if err != nil {
return nil, err
}
trace.TopicOnReaderStart(internalReader.Tracer(), internalReader.ID(), consumer, err)

return topicreader.NewReader(internalReader), nil
}
Expand Down
26 changes: 24 additions & 2 deletions internal/topic/topicreaderinternal/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ var (
errors.New("ydb: first connection attempt not finished"),
))
errReaderClosed = xerrors.Wrap(errors.New("ydb: reader closed"))
errUnexpectedEmptyConsumername = xerrors.Wrap(errors.New("ydb: create ydb reader with empty consumer name. Set one of: consumer name or option WithReaderWithoutConsumer")) //nolint:lll
errSetConsumerAndNoConsumer = xerrors.Wrap(errors.New("ydb: reader has non empty consumer name and set option WithReaderWithoutConsumer. Only one of them must be set")) //nolint:lll
errCantCommitWithoutConsumer = xerrors.Wrap(errors.New("ydb: reader can't commit messages without consumer"))
errCommitSessionFromOtherReader = xerrors.Wrap(errors.New("ydb: commit with session from other reader"))
)

Expand Down Expand Up @@ -82,8 +85,13 @@ func NewReader(
consumer string,
readSelectors []PublicReadSelector,
opts ...PublicReaderOption,
) Reader {
) (Reader, error) {
cfg := convertNewParamsToStreamConfig(consumer, readSelectors, opts...)

if err := cfg.Validate(); err != nil {
return Reader{}, err
}

readerID := nextReaderID()

readerConnector := func(ctx context.Context) (batchedStreamReader, error) {
Expand All @@ -108,7 +116,7 @@ func NewReader(
readerID: readerID,
}

return res
return res, nil
}

func (r *Reader) WaitInit(ctx context.Context) error {
Expand Down Expand Up @@ -226,6 +234,20 @@ type ReaderConfig struct {
topicStreamReaderConfig
}

func (c *ReaderConfig) Validate() error {
if c.Consumer != "" && c.ReadWithoutConsumer {
return xerrors.WithStackTrace(errSetConsumerAndNoConsumer)
}
if c.Consumer == "" && !c.ReadWithoutConsumer {
return xerrors.WithStackTrace(errUnexpectedEmptyConsumername)
}
if c.ReadWithoutConsumer && c.CommitMode != CommitModeNone {
return xerrors.WithStackTrace(errCantCommitWithoutConsumer)
}

return nil
}

type PublicReaderOption func(cfg *ReaderConfig)

func WithCredentials(cred credentials.Credentials) PublicReaderOption {
Expand Down
1 change: 1 addition & 0 deletions internal/topic/topicreaderinternal/stream_reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type topicStreamReaderConfig struct {
Cred credentials.Credentials
CredUpdateInterval time.Duration
Consumer string
ReadWithoutConsumer bool
ReadSelectors []*PublicReadSelector
Trace *trace.Topic
GetPartitionStartOffsetCallback PublicGetPartitionStartOffsetFunc
Expand Down
76 changes: 76 additions & 0 deletions tests/integration/topic_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ package integration

import (
"context"
"io"
"os"
"path"
"strings"
"testing"
"time"

Expand All @@ -15,9 +17,11 @@ import (

ydb "github.com/ydb-platform/ydb-go-sdk/v3"
"github.com/ydb-platform/ydb-go-sdk/v3/config"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/version"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xtest"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicoptions"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topictypes"
"github.com/ydb-platform/ydb-go-sdk/v3/topic/topicwriter"
)

const defaultConnectionString = "grpc://localhost:2136/local"
Expand Down Expand Up @@ -155,6 +159,78 @@ func TestSchemeList(t *testing.T) {
require.True(t, hasTopic)
}

func TestReaderWithoutConsumer(t *testing.T) {
t.Run("OK", func(t *testing.T) {
if version.Lt(os.Getenv("YDB_VERSION"), "24.1") {
t.Skip("Read topic without consumer implemented since YDB 24.1, test ran for '" + os.Getenv("YDB_VERSION") + "'")
}
scope := newScope(t)
ctx := scope.Ctx

reader1, err := scope.Driver().Topic().StartReader(
"",
topicoptions.ReadSelectors{
{
Path: scope.TopicPath(),
Partitions: []int64{0},
},
},
topicoptions.WithReaderWithoutConsumer(false),
)
require.NoError(t, err)

reader2, err := scope.Driver().Topic().StartReader(
"",
topicoptions.ReadSelectors{
{
Path: scope.TopicPath(),
Partitions: []int64{0},
},
},
topicoptions.WithReaderWithoutConsumer(false),
)
require.NoError(t, err)

err = scope.TopicWriter().Write(ctx, topicwriter.Message{Data: strings.NewReader("123")})
require.NoError(t, err)

msg1, err := reader1.ReadMessage(ctx)
require.NoError(t, err)
require.Equal(t, int64(1), msg1.SeqNo)

msg1data, err := io.ReadAll(msg1)
require.NoError(t, err)
require.Equal(t, "123", string(msg1data))

msg2, err := reader2.ReadMessage(ctx)
require.NoError(t, err)
require.Equal(t, int64(1), msg2.SeqNo)

msg2data, err := io.ReadAll(msg2)
require.NoError(t, err)
require.Equal(t, "123", string(msg2data))

_ = reader1.Close(ctx)
_ = reader2.Close(ctx)
})
t.Run("NoNameNoOptionErr", func(t *testing.T) {
scope := newScope(t)
topicReader, err := scope.Driver().Topic().StartReader("", topicoptions.ReadTopic(scope.TopicPath()))
require.Error(t, err)
require.Nil(t, topicReader)
})
t.Run("NameAndOption", func(t *testing.T) {
scope := newScope(t)
topicReader, err := scope.Driver().Topic().StartReader(
scope.TopicConsumerName(),
topicoptions.ReadTopic(scope.TopicPath()),
topicoptions.WithReaderWithoutConsumer(false),
)
require.Error(t, err)
require.Nil(t, topicReader)
})
}

func connect(t testing.TB, opts ...ydb.Option) *ydb.Driver {
return connectWithLogOption(t, false, opts...)
}
Expand Down
23 changes: 23 additions & 0 deletions topic/topicoptions/topicoptions_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,3 +257,26 @@ func WithReaderUpdateTokenInterval(interval time.Duration) ReaderOption {
cfg.CredUpdateInterval = interval
}
}

// WithReaderWithoutConsumer allow read topic without consumer.
// Read without consumer is special read mode on a server. In the mode every reader without consumer receive all
// messages from a topic and can't commit them.
// The mode work good if every reader process need all messages (for example for cache invalidation) and no need
// scale process messages by readers count.
//
// saveStateOnReconnection
// - if true: simulate one unbroken stream without duplicate messages (unimplemented)
// - if false: need store progress on client side for prevent re-read messages on internal reconnections to the server.
//
// Experimental: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#experimental
// https://github.com/ydb-platform/ydb-go-sdk/issues/905
func WithReaderWithoutConsumer(saveStateOnReconnection bool) ReaderOption {
if saveStateOnReconnection {
panic("ydb: saveStateOnReconnection mode doesn't implemented yet")
}

return func(cfg *topicreaderinternal.ReaderConfig) {
cfg.ReadWithoutConsumer = true
cfg.CommitMode = CommitModeNone
}
}
1 change: 1 addition & 0 deletions trace/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type (
TopicReaderStartInfo struct {
ReaderID int64
Consumer string
Error error
}

// Internals: https://github.com/ydb-platform/ydb-go-sdk/blob/master/VERSIONING.md#internals
Expand Down
3 changes: 2 additions & 1 deletion trace/topic_gtrace.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c98862c

Please sign in to comment.