diff --git a/internal/cmd/kafka/command_mirror.go b/internal/cmd/kafka/command_mirror.go index b7223b4212..20d4d249cd 100644 --- a/internal/cmd/kafka/command_mirror.go +++ b/internal/cmd/kafka/command_mirror.go @@ -1,8 +1,11 @@ package kafka import ( + "github.com/antihax/optional" "github.com/spf13/cobra" + "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" + pcmd "github.com/confluentinc/cli/internal/pkg/cmd" ) @@ -79,14 +82,15 @@ func (c *mirrorCommand) autocompleteMirrorTopics(cmd *cobra.Command) []string { return nil } - mirrors, err := kafkaREST.CloudClient.ListKafkaMirrorTopicsUnderLink(linkName, nil) + opts := &kafkarestv3.ListKafkaMirrorTopicsUnderLinkOpts{MirrorStatus: optional.EmptyInterface()} + listMirrorTopicsResponseDataList, _, err := kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopicsUnderLink(kafkaREST.Context, kafkaREST.GetClusterId(), linkName, opts) if err != nil { return nil } - suggestions := make([]string, len(mirrors.GetData())) - for i, mirror := range mirrors.GetData() { - suggestions[i] = mirror.GetMirrorTopicName() + suggestions := make([]string, len(listMirrorTopicsResponseDataList.Data)) + for i, mirrorTopic := range listMirrorTopicsResponseDataList.Data { + suggestions[i] = mirrorTopic.MirrorTopicName } return suggestions } diff --git a/internal/cmd/kafka/command_mirror_describe.go b/internal/cmd/kafka/command_mirror_describe.go index 46301c2298..e626ffd453 100644 --- a/internal/cmd/kafka/command_mirror_describe.go +++ b/internal/cmd/kafka/command_mirror_describe.go @@ -5,6 +5,7 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -47,22 +48,22 @@ func (c *mirrorCommand) describe(cmd *cobra.Command, args []string) error { return err } - mirror, err := kafkaREST.CloudClient.ReadKafkaMirrorTopic(linkName, mirrorTopicName) + mirror, httpResp, err := kafkaREST.Client.ClusterLinkingV3Api.ReadKafkaMirrorTopic(kafkaREST.Context, kafkaREST.GetClusterId(), linkName, mirrorTopicName) if err != nil { - return err + return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) } list := output.NewList(cmd) - for _, partitionLag := range mirror.GetMirrorLags().Items { + for _, partitionLag := range mirror.MirrorLags { list.Add(&mirrorOut{ - LinkName: mirror.GetLinkName(), - MirrorTopicName: mirror.GetMirrorTopicName(), - SourceTopicName: mirror.GetSourceTopicName(), - MirrorStatus: string(mirror.GetMirrorStatus()), - StatusTimeMs: mirror.GetStateTimeMs(), - Partition: partitionLag.GetPartition(), - PartitionMirrorLag: partitionLag.GetLag(), - LastSourceFetchOffset: partitionLag.GetLastSourceFetchOffset(), + LinkName: mirror.LinkName, + MirrorTopicName: mirror.MirrorTopicName, + SourceTopicName: mirror.SourceTopicName, + MirrorStatus: string(mirror.MirrorStatus), + StatusTimeMs: mirror.StateTimeMs, + Partition: partitionLag.Partition, + PartitionMirrorLag: partitionLag.Lag, + LastSourceFetchOffset: partitionLag.LastSourceFetchOffset, }) } list.Filter([]string{"LinkName", "MirrorTopicName", "Partition", "PartitionMirrorLag", "SourceTopicName", "MirrorStatus", "StatusTimeMs", "LastSourceFetchOffset"}) diff --git a/internal/cmd/kafka/command_mirror_list.go b/internal/cmd/kafka/command_mirror_list.go index 9d6ebce51e..3ecdb9f0eb 100644 --- a/internal/cmd/kafka/command_mirror_list.go +++ b/internal/cmd/kafka/command_mirror_list.go @@ -2,14 +2,16 @@ package kafka import ( "fmt" - "strings" + "net/http" + "github.com/antihax/optional" "github.com/spf13/cobra" - kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" + "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" + "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" "github.com/confluentinc/cli/internal/pkg/utils" ) @@ -60,43 +62,41 @@ func (c *mirrorCommand) list(cmd *cobra.Command, _ []string) error { return err } - var mirrorTopicStatus *kafkarestv3.MirrorTopicStatus + mirrorStatusOpt := optional.EmptyInterface() if mirrorStatus != "" { - mirrorTopicStatus, err = kafkarestv3.NewMirrorTopicStatusFromValue(strings.ToUpper(mirrorStatus)) - if err != nil { - return err - } + mirrorStatusOpt = optional.NewInterface(kafkarestv3.MirrorTopicStatus(mirrorStatus)) } - var mirrors kafkarestv3.ListMirrorTopicsResponseDataList + var listMirrorTopicsResponseDataList kafkarestv3.ListMirrorTopicsResponseDataList + var httpResp *http.Response + if linkName == "" { - mirrors, err = kafkaREST.CloudClient.ListKafkaMirrorTopics(mirrorTopicStatus) - if err != nil { - return err - } + opts := &kafkarestv3.ListKafkaMirrorTopicsOpts{MirrorStatus: mirrorStatusOpt} + listMirrorTopicsResponseDataList, httpResp, err = kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopics(kafkaREST.Context, kafkaREST.GetClusterId(), opts) } else { - mirrors, err = kafkaREST.CloudClient.ListKafkaMirrorTopicsUnderLink(linkName, mirrorTopicStatus) - if err != nil { - return err - } + opts := &kafkarestv3.ListKafkaMirrorTopicsUnderLinkOpts{MirrorStatus: mirrorStatusOpt} + listMirrorTopicsResponseDataList, httpResp, err = kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopicsUnderLink(kafkaREST.Context, kafkaREST.GetClusterId(), linkName, opts) + } + if err != nil { + return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) } list := output.NewList(cmd) - for _, mirror := range mirrors.GetData() { + for _, mirror := range listMirrorTopicsResponseDataList.Data { var maxLag int64 = 0 - for _, mirrorLag := range mirror.GetMirrorLags().Items { - if lag := mirrorLag.GetLag(); lag > maxLag { - maxLag = lag + for _, mirrorLag := range mirror.MirrorLags { + if mirrorLag.Lag > maxLag { + maxLag = mirrorLag.Lag } } list.Add(&mirrorOut{ - LinkName: mirror.GetLinkName(), - MirrorTopicName: mirror.GetMirrorTopicName(), - SourceTopicName: mirror.GetSourceTopicName(), - MirrorStatus: string(mirror.GetMirrorStatus()), - StatusTimeMs: mirror.GetStateTimeMs(), - NumPartition: mirror.GetNumPartitions(), + LinkName: mirror.LinkName, + MirrorTopicName: mirror.MirrorTopicName, + SourceTopicName: mirror.SourceTopicName, + MirrorStatus: string(mirror.MirrorStatus), + StatusTimeMs: mirror.StateTimeMs, + NumPartition: mirror.NumPartitions, MaxPerPartitionMirrorLag: maxLag, }) }