Skip to content

Commit

Permalink
Revert sdk change for confluent kafka mirror list and describe (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
sgagniere authored Aug 11, 2023
1 parent 9e2a8f6 commit 4047726
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 41 deletions.
12 changes: 8 additions & 4 deletions internal/cmd/kafka/command_mirror.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package kafka

import (
"github.com/antihax/optional"
"github.com/spf13/cobra"

"github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"

pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
)

Expand Down Expand Up @@ -79,14 +82,15 @@ func (c *mirrorCommand) autocompleteMirrorTopics(cmd *cobra.Command) []string {
return nil
}

mirrors, err := kafkaREST.CloudClient.ListKafkaMirrorTopicsUnderLink(linkName, nil)
opts := &kafkarestv3.ListKafkaMirrorTopicsUnderLinkOpts{MirrorStatus: optional.EmptyInterface()}
listMirrorTopicsResponseDataList, _, err := kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopicsUnderLink(kafkaREST.Context, kafkaREST.GetClusterId(), linkName, opts)
if err != nil {
return nil
}

suggestions := make([]string, len(mirrors.GetData()))
for i, mirror := range mirrors.GetData() {
suggestions[i] = mirror.GetMirrorTopicName()
suggestions := make([]string, len(listMirrorTopicsResponseDataList.Data))
for i, mirrorTopic := range listMirrorTopicsResponseDataList.Data {
suggestions[i] = mirrorTopic.MirrorTopicName
}
return suggestions
}
23 changes: 12 additions & 11 deletions internal/cmd/kafka/command_mirror_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/examples"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/output"
)

Expand Down Expand Up @@ -47,22 +48,22 @@ func (c *mirrorCommand) describe(cmd *cobra.Command, args []string) error {
return err
}

mirror, err := kafkaREST.CloudClient.ReadKafkaMirrorTopic(linkName, mirrorTopicName)
mirror, httpResp, err := kafkaREST.Client.ClusterLinkingV3Api.ReadKafkaMirrorTopic(kafkaREST.Context, kafkaREST.GetClusterId(), linkName, mirrorTopicName)
if err != nil {
return err
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
}

list := output.NewList(cmd)
for _, partitionLag := range mirror.GetMirrorLags().Items {
for _, partitionLag := range mirror.MirrorLags {
list.Add(&mirrorOut{
LinkName: mirror.GetLinkName(),
MirrorTopicName: mirror.GetMirrorTopicName(),
SourceTopicName: mirror.GetSourceTopicName(),
MirrorStatus: string(mirror.GetMirrorStatus()),
StatusTimeMs: mirror.GetStateTimeMs(),
Partition: partitionLag.GetPartition(),
PartitionMirrorLag: partitionLag.GetLag(),
LastSourceFetchOffset: partitionLag.GetLastSourceFetchOffset(),
LinkName: mirror.LinkName,
MirrorTopicName: mirror.MirrorTopicName,
SourceTopicName: mirror.SourceTopicName,
MirrorStatus: string(mirror.MirrorStatus),
StatusTimeMs: mirror.StateTimeMs,
Partition: partitionLag.Partition,
PartitionMirrorLag: partitionLag.Lag,
LastSourceFetchOffset: partitionLag.LastSourceFetchOffset,
})
}
list.Filter([]string{"LinkName", "MirrorTopicName", "Partition", "PartitionMirrorLag", "SourceTopicName", "MirrorStatus", "StatusTimeMs", "LastSourceFetchOffset"})
Expand Down
52 changes: 26 additions & 26 deletions internal/cmd/kafka/command_mirror_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package kafka

import (
"fmt"
"strings"
"net/http"

"github.com/antihax/optional"
"github.com/spf13/cobra"

kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"
"github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3"

pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/examples"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/output"
"github.com/confluentinc/cli/internal/pkg/utils"
)
Expand Down Expand Up @@ -60,43 +62,41 @@ func (c *mirrorCommand) list(cmd *cobra.Command, _ []string) error {
return err
}

var mirrorTopicStatus *kafkarestv3.MirrorTopicStatus
mirrorStatusOpt := optional.EmptyInterface()
if mirrorStatus != "" {
mirrorTopicStatus, err = kafkarestv3.NewMirrorTopicStatusFromValue(strings.ToUpper(mirrorStatus))
if err != nil {
return err
}
mirrorStatusOpt = optional.NewInterface(kafkarestv3.MirrorTopicStatus(mirrorStatus))
}

var mirrors kafkarestv3.ListMirrorTopicsResponseDataList
var listMirrorTopicsResponseDataList kafkarestv3.ListMirrorTopicsResponseDataList
var httpResp *http.Response

if linkName == "" {
mirrors, err = kafkaREST.CloudClient.ListKafkaMirrorTopics(mirrorTopicStatus)
if err != nil {
return err
}
opts := &kafkarestv3.ListKafkaMirrorTopicsOpts{MirrorStatus: mirrorStatusOpt}
listMirrorTopicsResponseDataList, httpResp, err = kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopics(kafkaREST.Context, kafkaREST.GetClusterId(), opts)
} else {
mirrors, err = kafkaREST.CloudClient.ListKafkaMirrorTopicsUnderLink(linkName, mirrorTopicStatus)
if err != nil {
return err
}
opts := &kafkarestv3.ListKafkaMirrorTopicsUnderLinkOpts{MirrorStatus: mirrorStatusOpt}
listMirrorTopicsResponseDataList, httpResp, err = kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopicsUnderLink(kafkaREST.Context, kafkaREST.GetClusterId(), linkName, opts)
}
if err != nil {
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
}

list := output.NewList(cmd)
for _, mirror := range mirrors.GetData() {
for _, mirror := range listMirrorTopicsResponseDataList.Data {
var maxLag int64 = 0
for _, mirrorLag := range mirror.GetMirrorLags().Items {
if lag := mirrorLag.GetLag(); lag > maxLag {
maxLag = lag
for _, mirrorLag := range mirror.MirrorLags {
if mirrorLag.Lag > maxLag {
maxLag = mirrorLag.Lag
}
}

list.Add(&mirrorOut{
LinkName: mirror.GetLinkName(),
MirrorTopicName: mirror.GetMirrorTopicName(),
SourceTopicName: mirror.GetSourceTopicName(),
MirrorStatus: string(mirror.GetMirrorStatus()),
StatusTimeMs: mirror.GetStateTimeMs(),
NumPartition: mirror.GetNumPartitions(),
LinkName: mirror.LinkName,
MirrorTopicName: mirror.MirrorTopicName,
SourceTopicName: mirror.SourceTopicName,
MirrorStatus: string(mirror.MirrorStatus),
StatusTimeMs: mirror.StateTimeMs,
NumPartition: mirror.NumPartitions,
MaxPerPartitionMirrorLag: maxLag,
})
}
Expand Down

0 comments on commit 4047726

Please sign in to comment.