Skip to content

Commit

Permalink
add Topic.DescribeTopicConsumer
Browse files Browse the repository at this point in the history
  • Loading branch information
qrort committed Oct 15, 2024
1 parent 84a2b9b commit 6503633
Show file tree
Hide file tree
Showing 11 changed files with 680 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added `db.Topic().DescribeTopicConsumer()` method for displaying consumer information

## v3.84.1
* Added session info into `trace.TableSessionBulkUpsertStartInfo`

Expand Down
28 changes: 28 additions & 0 deletions internal/grpcwrapper/rawoptional/rawoptional.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,26 @@ func (v *Duration) ToProto() *durationpb.Duration {
return nil
}

func (v *Duration) MustFromProto(proto *durationpb.Duration) {
if proto == nil {
v.Value = time.Duration(0)
v.HasValue = false

return
}

v.HasValue = true
v.Value = proto.AsDuration()
}

func (v *Duration) ToDuration() *time.Duration {
if v.HasValue {
return nil
}

return &v.Value
}

type Int64 struct {
Value int64
HasValue bool
Expand Down Expand Up @@ -74,3 +94,11 @@ func (v *Time) MustFromProto(proto *timestamppb.Timestamp) {
v.HasValue = true
v.Value = proto.AsTime()
}

func (v *Time) ToTime() *time.Time {
if v.HasValue {
return nil
}

return &v.Value
}
24 changes: 21 additions & 3 deletions internal/grpcwrapper/rawtopic/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,27 @@ func (c *Client) CreateTopic(
func (c *Client) DescribeTopic(ctx context.Context, req DescribeTopicRequest) (res DescribeTopicResult, err error) {
resp, err := c.service.DescribeTopic(ctx, req.ToProto())
if err != nil {
return DescribeTopicResult{}, xerrors.WithStackTrace(xerrors.Wrap(
fmt.Errorf("ydb: describe topic grpc failed: %w", err),
))
return DescribeTopicResult{}, xerrors.WithStackTrace(
xerrors.Wrap(
fmt.Errorf("ydb: describe topic grpc failed: %w", err),
),
)
}
err = res.FromProto(resp)

return res, err
}

func (c *Client) DescribeConsumer(ctx context.Context, req DescribeConsumerRequest) (
res DescribeConsumerResult, err error,
) {
resp, err := c.service.DescribeConsumer(ctx, req.ToProto())
if err != nil {
return DescribeConsumerResult{}, xerrors.WithStackTrace(
xerrors.Wrap(
fmt.Errorf("ydb: describe topic consumer grpc failed: %w", err),
),
)
}
err = res.FromProto(resp)

Expand Down
151 changes: 151 additions & 0 deletions internal/grpcwrapper/rawtopic/describe_consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package rawtopic

import (
"fmt"

"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Topic"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/clone"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawoptional"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawscheme"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawtopic/rawtopiccommon"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/grpcwrapper/rawydb"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/operation"
"github.com/ydb-platform/ydb-go-sdk/v3/internal/xerrors"
)

type DescribeConsumerRequest struct {
OperationParams rawydb.OperationParams
Path string
Consumer string
IncludeStats bool
}

func (req *DescribeConsumerRequest) ToProto() *Ydb_Topic.DescribeConsumerRequest {
return &Ydb_Topic.DescribeConsumerRequest{
OperationParams: req.OperationParams.ToProto(),
Path: req.Path,
Consumer: req.Consumer,
IncludeStats: req.IncludeStats,
}
}

type DescribeConsumerResult struct {
Operation rawydb.Operation
Self rawscheme.Entry
Consumer Consumer
Partitions []DescribeConsumerResultPartitionInfo
}

func (res *DescribeConsumerResult) FromProto(response operation.Response) error {
if err := res.Operation.FromProtoWithStatusCheck(response.GetOperation()); err != nil {
return err
}
protoResult := &Ydb_Topic.DescribeConsumerResult{}
if err := response.GetOperation().GetResult().UnmarshalTo(protoResult); err != nil {
return xerrors.WithStackTrace(
fmt.Errorf(
"ydb: describe consumer result failed on unmarshal grpc result: %w", err,
),
)
}

if err := res.Self.FromProto(protoResult.GetSelf()); err != nil {
return err
}

res.Consumer.MustFromProto(protoResult.GetConsumer())

protoPartitions := protoResult.GetPartitions()
res.Partitions = make([]DescribeConsumerResultPartitionInfo, len(protoPartitions))
for i, protoPartition := range protoPartitions {
if err := res.Partitions[i].FromProto(protoPartition); err != nil {
return err
}
}

return nil
}

type MultipleWindowsStat struct {
PerMinute int64
PerHour int64
PerDay int64
}

func (stat *MultipleWindowsStat) MustFromProto(proto *Ydb_Topic.MultipleWindowsStat) {
stat.PerMinute = proto.GetPerMinute()
stat.PerHour = proto.GetPerHour()
stat.PerDay = proto.GetPerDay()
}

type PartitionStats struct {
PartitionsOffset rawtopiccommon.OffsetRange
StoreSizeBytes int64
LastWriteTime rawoptional.Time
MaxWriteTimeLag rawoptional.Duration
BytesWritten MultipleWindowsStat
}

func (ps *PartitionStats) FromProto(proto *Ydb_Topic.PartitionStats) error {
if proto == nil {
return nil
}
if err := ps.PartitionsOffset.FromProto(proto.GetPartitionOffsets()); err != nil {
return err
}
ps.StoreSizeBytes = proto.GetStoreSizeBytes()
ps.LastWriteTime.MustFromProto(proto.GetLastWriteTime())
ps.MaxWriteTimeLag.ToProto()
ps.BytesWritten.MustFromProto(proto.GetBytesWritten())

return nil
}

type PartitionConsumerStats struct {
LastReadOffset int64
CommittedOffset int64
ReadSessionID string
PartitionReadSessionCreateTime rawoptional.Time
LastReadTime rawoptional.Time
MaxReadTimeLag rawoptional.Duration
MaxWriteTimeLag rawoptional.Duration
BytesRead MultipleWindowsStat
ReaderName string
}

func (stats *PartitionConsumerStats) FromProto(proto *Ydb_Topic.DescribeConsumerResult_PartitionConsumerStats) error {
if proto == nil {
return nil
}
stats.LastReadOffset = proto.GetLastReadOffset()
stats.CommittedOffset = proto.GetCommittedOffset()
stats.ReadSessionID = proto.GetReadSessionId()
stats.PartitionReadSessionCreateTime.MustFromProto(proto.GetPartitionReadSessionCreateTime())
stats.LastReadTime.MustFromProto(proto.GetLastReadTime())
stats.MaxReadTimeLag.MustFromProto(proto.GetMaxReadTimeLag())
stats.MaxWriteTimeLag.MustFromProto(proto.GetMaxWriteTimeLag())
stats.BytesRead.MustFromProto(proto.GetBytesRead())
stats.ReaderName = proto.GetReaderName()

return nil
}

type DescribeConsumerResultPartitionInfo struct {
PartitionID int64
Active bool
ChildPartitionIDs []int64
ParentPartitionIDs []int64
PartitionStats PartitionStats
PartitionConsumerStats PartitionConsumerStats
}

func (pi *DescribeConsumerResultPartitionInfo) FromProto(proto *Ydb_Topic.DescribeConsumerResult_PartitionInfo) error {
pi.PartitionID = proto.GetPartitionId()
pi.Active = proto.GetActive()

pi.ChildPartitionIDs = clone.Int64Slice(proto.GetChildPartitionIds())
pi.ParentPartitionIDs = clone.Int64Slice(proto.GetParentPartitionIds())

return pi.PartitionStats.FromProto(proto.GetPartitionStats())
}
48 changes: 48 additions & 0 deletions internal/topic/topicclientinternal/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,54 @@ func (c *Client) Describe(
return res, nil
}

// Describe topic consumer
func (c *Client) DescribeTopicConsumer(
ctx context.Context,
path string,
consumer string,
opts ...topicoptions.DescribeConsumerOption,
) (res topictypes.TopicConsumerDescription, _ error) {
req := rawtopic.DescribeConsumerRequest{
OperationParams: c.defaultOperationParams,
Path: path,
Consumer: consumer,
}

for _, opt := range opts {
if opt != nil {
opt(&req)
}
}

var rawRes rawtopic.DescribeConsumerResult

call := func(ctx context.Context) (describeErr error) {
rawRes, describeErr = c.rawClient.DescribeConsumer(ctx, req)

return describeErr
}

var err error

if c.cfg.AutoRetry() {
err = retry.Retry(ctx, call,
retry.WithIdempotent(true),
retry.WithTrace(c.cfg.TraceRetry()),
retry.WithBudget(c.cfg.RetryBudget()),
)
} else {
err = call(ctx)
}

if err != nil {
return res, err
}

res.FromRaw(&rawRes)

return res, nil
}

// Drop topic
func (c *Client) Drop(ctx context.Context, path string, opts ...topicoptions.DropOption) error {
req := rawtopic.DropTopicRequest{}
Expand Down
Loading

0 comments on commit 6503633

Please sign in to comment.