From 518e9c26e3069161552883f3761ac4cdcff215a9 Mon Sep 17 00:00:00 2001 From: Steven Gagniere <108363707+sgagniere@users.noreply.github.com> Date: Thu, 3 Aug 2023 13:57:45 -0700 Subject: [PATCH] Cloud Kafka REST cleanup (#2144) --- internal/cmd/asyncapi/command_export.go | 13 +- internal/cmd/asyncapi/command_import.go | 15 +- internal/cmd/kafka/command_acl_create.go | 14 +- internal/cmd/kafka/command_acl_delete.go | 18 +- internal/cmd/kafka/command_acl_list.go | 12 +- .../command_cluster_configuration_describe.go | 7 +- .../command_cluster_configuration_list.go | 7 +- .../command_cluster_configuration_update.go | 7 +- .../cmd/kafka/command_cluster_describe.go | 5 +- internal/cmd/kafka/command_consumergroup.go | 7 +- .../kafka/command_consumergroup_describe.go | 11 +- .../kafka/command_consumergroup_lag_get.go | 7 +- .../kafka/command_consumergroup_lag_list.go | 7 +- .../command_consumergroup_lag_summarize.go | 7 +- .../cmd/kafka/command_consumergroup_list.go | 7 +- .../kafka/command_link_configuration_list.go | 34 +- .../command_link_configuration_update.go | 13 +- internal/cmd/kafka/command_link_create.go | 13 +- internal/cmd/kafka/command_link_delete.go | 13 +- internal/cmd/kafka/command_link_describe.go | 34 +- internal/cmd/kafka/command_link_list.go | 44 +- .../cmd/kafka/command_link_list_onprem.go | 10 +- internal/cmd/kafka/command_mirror.go | 17 +- internal/cmd/kafka/command_mirror_create.go | 13 +- internal/cmd/kafka/command_mirror_describe.go | 32 +- internal/cmd/kafka/command_mirror_failover.go | 22 +- internal/cmd/kafka/command_mirror_list.go | 67 ++- internal/cmd/kafka/command_mirror_pause.go | 22 +- internal/cmd/kafka/command_mirror_promote.go | 22 +- internal/cmd/kafka/command_mirror_resume.go | 47 +- internal/cmd/kafka/command_topic_create.go | 16 +- internal/cmd/kafka/command_topic_delete.go | 13 +- internal/cmd/kafka/command_topic_describe.go | 13 +- internal/cmd/kafka/command_topic_list.go | 12 +- internal/cmd/kafka/command_topic_update.go | 17 +- internal/cmd/kafka/utils.go | 17 - .../cmd/ksql/command_cluster_configureacls.go | 9 +- internal/pkg/ccloudv2/kafkarest.go | 186 ++++--- internal/pkg/cmd/flags.go | 7 +- internal/pkg/cmd/kafka_rest.go | 9 +- internal/pkg/cmd/prerunner.go | 33 +- internal/pkg/errors/strings.go | 6 - internal/pkg/kafka/utils.go | 4 - internal/pkg/kafkarest/kafkarest.go | 4 + ...ration-list-bidirectional-link-json.golden | 2 +- ...ration-list-bidirectional-link-yaml.golden | 2 +- .../kafka/link/configuration-list-json.golden | 2 +- ...ation-list-plain-bidirectional-link.golden | 12 +- .../link/configuration-list-plain.golden | 12 +- .../kafka/link/configuration-list-yaml.golden | 2 +- .../kafka/mirror/describe-mirror-json.golden | 6 +- .../kafka/mirror/describe-mirror-yaml.golden | 6 +- .../kafka/mirror/describe-mirror.golden | 6 +- .../kafka/mirror/failover-mirror.golden | 6 + .../kafka/mirror/list-all-mirror-json.golden | 4 +- .../kafka/mirror/list-all-mirror-yaml.golden | 4 +- .../kafka/mirror/list-all-mirror.golden | 4 +- .../output/kafka/mirror/list-help.golden | 2 +- .../kafka/mirror/list-mirror-json.golden | 4 +- .../kafka/mirror/list-mirror-yaml.golden | 4 +- .../output/kafka/mirror/list-mirror.golden | 4 +- .../output/kafka/mirror/pause-mirror.golden | 5 + .../output/kafka/mirror/resume-mirror.golden | 5 + test/kafka_test.go | 3 + test/test-server/kafka_rest_router.go | 501 ++++++++++++------ 65 files changed, 742 insertions(+), 747 deletions(-) create mode 100644 test/fixtures/output/kafka/mirror/failover-mirror.golden create mode 100644 test/fixtures/output/kafka/mirror/pause-mirror.golden create mode 100644 test/fixtures/output/kafka/mirror/resume-mirror.golden diff --git a/internal/cmd/asyncapi/command_export.go b/internal/cmd/asyncapi/command_export.go index 49c3a899c0..b0881056a6 100644 --- a/internal/cmd/asyncapi/command_export.go +++ b/internal/cmd/asyncapi/command_export.go @@ -20,7 +20,6 @@ import ( v1 "github.com/confluentinc/cli/internal/pkg/config/v1" "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/log" "github.com/confluentinc/cli/internal/pkg/output" schemaregistry "github.com/confluentinc/cli/internal/pkg/schema-registry" @@ -191,7 +190,7 @@ func (c *command) getChannelDetails(details *accountDetails, flags *flags) error } details.channelDetails.example = example } - bindings, err := c.getBindings(details.kafkaClusterId, details.channelDetails.currentTopic.GetTopicName()) + bindings, err := c.getBindings(details.channelDetails.currentTopic.GetTopicName()) if err != nil { log.CliLogger.Warnf("Bindings not found: %v", err) } @@ -302,17 +301,17 @@ func (c command) getMessageExamples(consumer *ckgo.Consumer, topicName, contentT return jsonMessage, nil } -func (c *command) getBindings(clusterId, topicName string) (*bindings, error) { +func (c *command) getBindings(topicName string) (*bindings, error) { kafkaREST, err := c.GetKafkaREST() if err != nil { return nil, err } - configs, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(clusterId, topicName) + configs, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(topicName) if err != nil { return nil, err } var numPartitions int32 - partitionsResp, _, err := kafkaREST.CloudClient.ListKafkaPartitions(clusterId, topicName) + partitionsResp, _, err := kafkaREST.CloudClient.ListKafkaPartitions(topicName) if err != nil { return nil, fmt.Errorf("unable to get topic partitions: %v", err) } @@ -401,9 +400,9 @@ func (c *command) getClusterDetails(details *accountDetails, flags *flags) error return err } - topics, httpResp, err := kafkaREST.CloudClient.ListKafkaTopics(clusterConfig.ID) + topics, err := kafkaREST.CloudClient.ListKafkaTopics() if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } environment, err := c.Context.EnvironmentId() diff --git a/internal/cmd/asyncapi/command_import.go b/internal/cmd/asyncapi/command_import.go index 5398b5627f..61146f85fc 100644 --- a/internal/cmd/asyncapi/command_import.go +++ b/internal/cmd/asyncapi/command_import.go @@ -258,16 +258,16 @@ func (c *command) addTopic(details *accountDetails, topicName string, kafkaBindi return true, false, nil } // Overwrite existing topic - err := c.updateTopic(details, topicName, kafkaBinding) + err := c.updateTopic(topicName, kafkaBinding) return true, false, err } } // Create a new topic - newTopicCreated, err := c.createTopic(details, topicName, kafkaBinding) + newTopicCreated, err := c.createTopic(topicName, kafkaBinding) return false, newTopicCreated, err } -func (c *command) createTopic(details *accountDetails, topicName string, kafkaBinding kafkaBinding) (bool, error) { +func (c *command) createTopic(topicName string, kafkaBinding kafkaBinding) (bool, error) { log.CliLogger.Infof("Topic not found. Adding a new topic: %s", topicName) topicConfigs := []kafkarestv3.CreateTopicRequestDataConfigs{} for configName, configValue := range combineTopicConfigs(kafkaBinding) { @@ -288,8 +288,7 @@ func (c *command) createTopic(details *accountDetails, topicName string, kafkaBi if err != nil { return false, err } - if _, httpResp, err := kafkaRest.CloudClient.CreateKafkaTopic(details.kafkaClusterId, - createTopicRequestData); err != nil { + if _, httpResp, err := kafkaRest.CloudClient.CreateKafkaTopic(createTopicRequestData); err != nil { restErr, parseErr := kafkarest.ParseOpenAPIErrorCloud(err) if parseErr == nil && restErr.Code == ccloudv2.BadRequestErrorCode { // Print partition limit error w/ suggestion @@ -303,7 +302,7 @@ func (c *command) createTopic(details *accountDetails, topicName string, kafkaBi return true, nil } -func (c *command) updateTopic(details *accountDetails, topicName string, kafkaBinding kafkaBinding) error { +func (c *command) updateTopic(topicName string, kafkaBinding kafkaBinding) error { // Overwrite topic configs updateConfigs := []kafkarestv3.AlterConfigBatchRequestDataData{} modifiableConfigs := []string{} @@ -311,7 +310,7 @@ func (c *command) updateTopic(details *accountDetails, topicName string, kafkaBi if err != nil { return err } - configs, err := kafkaRest.CloudClient.ListKafkaTopicConfigs(details.kafkaClusterId, topicName) + configs, err := kafkaRest.CloudClient.ListKafkaTopicConfigs(topicName) if err != nil { return err } @@ -331,7 +330,7 @@ func (c *command) updateTopic(details *accountDetails, topicName string, kafkaBi } log.CliLogger.Info("Overwriting topic configs") if updateConfigs != nil { - _, err = kafkaRest.CloudClient.UpdateKafkaTopicConfigBatch(details.kafkaClusterId, topicName, kafkarestv3.AlterConfigBatchRequestData{Data: updateConfigs}) + _, err = kafkaRest.CloudClient.UpdateKafkaTopicConfigBatch(topicName, kafkarestv3.AlterConfigBatchRequestData{Data: updateConfigs}) if err != nil { return fmt.Errorf("unable to update topic configs: %v", err) } diff --git a/internal/cmd/kafka/command_acl_create.go b/internal/cmd/kafka/command_acl_create.go index 83b0af9675..14285ab6f1 100644 --- a/internal/cmd/kafka/command_acl_create.go +++ b/internal/cmd/kafka/command_acl_create.go @@ -9,7 +9,6 @@ import ( "github.com/confluentinc/cli/internal/pkg/ccstructs" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" ) func (c *aclCommand) newCreateCommand() *cobra.Command { @@ -66,27 +65,22 @@ func (c *aclCommand) create(cmd *cobra.Command, _ []string) error { bindings[i] = acl.ACLBinding } - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { - return err - } - - kafkaREST, err := c.GetKafkaREST() - if err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return err } for i, binding := range bindings { data := pacl.GetCreateAclRequestData(binding) - if httpResp, err := kafkaREST.CloudClient.CreateKafkaAcls(kafkaClusterConfig.ID, data); err != nil { + if err := kafkaREST.CloudClient.CreateKafkaAcls(data); err != nil { if i > 0 { _ = pacl.PrintACLs(cmd, bindings[:i]) } - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } } diff --git a/internal/cmd/kafka/command_acl_delete.go b/internal/cmd/kafka/command_acl_delete.go index d5b84541de..bc06bc8cb2 100644 --- a/internal/cmd/kafka/command_acl_delete.go +++ b/internal/cmd/kafka/command_acl_delete.go @@ -9,7 +9,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/form" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -60,25 +59,20 @@ func (c *aclCommand) delete(cmd *cobra.Command, _ []string) error { filters[i] = convertToFilter(acl.ACLBinding) } - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { - return err - } - - kafkaREST, err := c.GetKafkaREST() - if err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return err } count := 0 for _, acl := range acls { - aclDataList, httpResp, err := kafkaREST.CloudClient.GetKafkaAcls(kafkaClusterConfig.ID, acl.ACLBinding) + aclDataList, err := kafkaREST.CloudClient.GetKafkaAcls(acl.ACLBinding) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } if len(aclDataList.Data) == 0 { return errors.NewErrorWithSuggestions("one or more ACLs matching these parameters not found", ValidACLSuggestion) @@ -96,12 +90,12 @@ func (c *aclCommand) delete(cmd *cobra.Command, _ []string) error { count = 0 for i, filter := range filters { - deleteResp, httpResp, err := kafkaREST.CloudClient.DeleteKafkaAcls(kafkaClusterConfig.ID, filter) + deleteResp, err := kafkaREST.CloudClient.DeleteKafkaAcls(filter) if err != nil { if i > 0 { output.ErrPrintln(printAclsDeleted(count)) } - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } count += len(deleteResp.Data) diff --git a/internal/cmd/kafka/command_acl_list.go b/internal/cmd/kafka/command_acl_list.go index b0997edb0c..33793be2f0 100644 --- a/internal/cmd/kafka/command_acl_list.go +++ b/internal/cmd/kafka/command_acl_list.go @@ -5,7 +5,6 @@ import ( aclutil "github.com/confluentinc/cli/internal/pkg/acl" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/kafkarest" ) func (c *aclCommand) newListCommand() *cobra.Command { @@ -43,24 +42,19 @@ func (c *aclCommand) list(cmd *cobra.Command, _ []string) error { return acl[0].errors } - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return err } - kafkaREST, err := c.GetKafkaREST() + aclDataList, err := kafkaREST.CloudClient.GetKafkaAcls(acl[0].ACLBinding) if err != nil { return err } - aclDataList, httpResp, err := kafkaREST.CloudClient.GetKafkaAcls(kafkaClusterConfig.ID, acl[0].ACLBinding) - if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) - } - return aclutil.PrintACLsFromKafkaRestResponse(cmd, aclDataList.Data) } diff --git a/internal/cmd/kafka/command_cluster_configuration_describe.go b/internal/cmd/kafka/command_cluster_configuration_describe.go index 6e9e8b61d2..4350448672 100644 --- a/internal/cmd/kafka/command_cluster_configuration_describe.go +++ b/internal/cmd/kafka/command_cluster_configuration_describe.go @@ -39,12 +39,7 @@ func (c *clusterCommand) configurationDescribe(cmd *cobra.Command, args []string return err } - cluster, err := c.Context.GetKafkaClusterForCommand() - if err != nil { - return err - } - - config, err := kafkaREST.CloudClient.GetKafkaClusterConfig(cluster.ID, args[0]) + config, err := kafkaREST.CloudClient.GetKafkaClusterConfig(args[0]) if err != nil { return catchConfigurationNotFound(err, args[0]) } diff --git a/internal/cmd/kafka/command_cluster_configuration_list.go b/internal/cmd/kafka/command_cluster_configuration_list.go index e736231b79..d12902b6c2 100644 --- a/internal/cmd/kafka/command_cluster_configuration_list.go +++ b/internal/cmd/kafka/command_cluster_configuration_list.go @@ -29,12 +29,7 @@ func (c *clusterCommand) configurationList(cmd *cobra.Command, args []string) er return err } - cluster, err := c.Context.GetKafkaClusterForCommand() - if err != nil { - return err - } - - configs, err := kafkaREST.CloudClient.ListKafkaClusterConfigs(cluster.ID) + configs, err := kafkaREST.CloudClient.ListKafkaClusterConfigs() if err != nil { return err } diff --git a/internal/cmd/kafka/command_cluster_configuration_update.go b/internal/cmd/kafka/command_cluster_configuration_update.go index 901568fed0..92563a2333 100644 --- a/internal/cmd/kafka/command_cluster_configuration_update.go +++ b/internal/cmd/kafka/command_cluster_configuration_update.go @@ -45,11 +45,6 @@ func (c *clusterCommand) configurationUpdate(cmd *cobra.Command, _ []string) err return err } - cluster, err := c.Context.GetKafkaClusterForCommand() - if err != nil { - return err - } - config, err := cmd.Flags().GetStringSlice("config") if err != nil { return err @@ -71,7 +66,7 @@ func (c *clusterCommand) configurationUpdate(cmd *cobra.Command, _ []string) err } req := kafkarestv3.AlterConfigBatchRequestData{Data: data} - if err := kafkaREST.CloudClient.UpdateKafkaClusterConfigs(cluster.ID, req); err != nil { + if err := kafkaREST.CloudClient.UpdateKafkaClusterConfigs(req); err != nil { return err } diff --git a/internal/cmd/kafka/command_cluster_describe.go b/internal/cmd/kafka/command_cluster_describe.go index fa009d2022..1ae5054ab7 100644 --- a/internal/cmd/kafka/command_cluster_describe.go +++ b/internal/cmd/kafka/command_cluster_describe.go @@ -11,7 +11,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" v1 "github.com/confluentinc/cli/internal/pkg/config/v1" "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/log" "github.com/confluentinc/cli/internal/pkg/output" "github.com/confluentinc/cli/internal/pkg/resource" @@ -180,9 +179,9 @@ func (c *clusterCommand) getTopicCountForKafkaCluster(cluster *cmkv2.CmkV2Cluste return 0, err } - topics, httpResp, err := kafkaREST.CloudClient.ListKafkaTopics(cluster.GetId()) + topics, err := kafkaREST.CloudClient.ListKafkaTopics() if err != nil { - return 0, kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return 0, err } return len(topics.Data), nil diff --git a/internal/cmd/kafka/command_consumergroup.go b/internal/cmd/kafka/command_consumergroup.go index 9fc2ed90ea..8535344f17 100644 --- a/internal/cmd/kafka/command_consumergroup.go +++ b/internal/cmd/kafka/command_consumergroup.go @@ -6,7 +6,6 @@ import ( kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/kafkarest" ) type consumerGroupCommand struct { @@ -83,14 +82,14 @@ func (c *consumerGroupCommand) autocompleteConsumerGroups() []string { } func listConsumerGroups(flagCmd *pcmd.AuthenticatedCLICommand) (*kafkarestv3.ConsumerGroupDataList, error) { - kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(flagCmd) + kafkaREST, err := flagCmd.GetKafkaREST() if err != nil { return nil, err } - groupCmdResp, httpResp, err := kafkaREST.CloudClient.ListKafkaConsumerGroups(lkc) + groupCmdResp, err := kafkaREST.CloudClient.ListKafkaConsumerGroups() if err != nil { - return nil, kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return nil, err } return &groupCmdResp, nil diff --git a/internal/cmd/kafka/command_consumergroup_describe.go b/internal/cmd/kafka/command_consumergroup_describe.go index 5e7c1edf27..e70dc8b0c6 100644 --- a/internal/cmd/kafka/command_consumergroup_describe.go +++ b/internal/cmd/kafka/command_consumergroup_describe.go @@ -9,7 +9,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -40,19 +39,19 @@ func (c *consumerGroupCommand) newDescribeCommand() *cobra.Command { func (c *consumerGroupCommand) describe(cmd *cobra.Command, args []string) error { consumerGroupId := args[0] - kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(c.AuthenticatedCLICommand) + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - groupCmdResp, httpResp, err := kafkaREST.CloudClient.GetKafkaConsumerGroup(lkc, consumerGroupId) + groupCmdResp, err := kafkaREST.CloudClient.GetKafkaConsumerGroup(consumerGroupId) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } - groupCmdConsumersResp, httpResp, err := kafkaREST.CloudClient.ListKafkaConsumers(lkc, consumerGroupId) + groupCmdConsumersResp, err := kafkaREST.CloudClient.ListKafkaConsumers(consumerGroupId) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } groupData := getGroupData(groupCmdResp, groupCmdConsumersResp) diff --git a/internal/cmd/kafka/command_consumergroup_lag_get.go b/internal/cmd/kafka/command_consumergroup_lag_get.go index 6ac8f2f2ed..a0e882902a 100644 --- a/internal/cmd/kafka/command_consumergroup_lag_get.go +++ b/internal/cmd/kafka/command_consumergroup_lag_get.go @@ -5,7 +5,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -52,14 +51,14 @@ func (c *lagCommand) getLag(cmd *cobra.Command, args []string) error { return err } - kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(c.AuthenticatedCLICommand) + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - lagGetResp, httpResp, err := kafkaREST.CloudClient.GetKafkaConsumerLag(lkc, consumerGroupId, topic, partition) + lagGetResp, err := kafkaREST.CloudClient.GetKafkaConsumerLag(consumerGroupId, topic, partition) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } table := output.NewTable(cmd) diff --git a/internal/cmd/kafka/command_consumergroup_lag_list.go b/internal/cmd/kafka/command_consumergroup_lag_list.go index a114b9c70f..ebf189a05f 100644 --- a/internal/cmd/kafka/command_consumergroup_lag_list.go +++ b/internal/cmd/kafka/command_consumergroup_lag_list.go @@ -5,7 +5,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -34,14 +33,14 @@ func (c *lagCommand) newListCommand() *cobra.Command { } func (c *lagCommand) list(cmd *cobra.Command, args []string) error { - kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(c.AuthenticatedCLICommand) + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - lagSummaryResp, httpResp, err := kafkaREST.CloudClient.ListKafkaConsumerLags(lkc, args[0]) + lagSummaryResp, err := kafkaREST.CloudClient.ListKafkaConsumerLags(args[0]) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } list := output.NewList(cmd) diff --git a/internal/cmd/kafka/command_consumergroup_lag_summarize.go b/internal/cmd/kafka/command_consumergroup_lag_summarize.go index c400581eeb..1b607863a2 100644 --- a/internal/cmd/kafka/command_consumergroup_lag_summarize.go +++ b/internal/cmd/kafka/command_consumergroup_lag_summarize.go @@ -5,7 +5,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -48,14 +47,14 @@ func (c *lagCommand) newSummarizeCommand() *cobra.Command { func (c *lagCommand) summarize(cmd *cobra.Command, args []string) error { consumerGroupId := args[0] - kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(c.AuthenticatedCLICommand) + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - summary, httpResp, err := kafkaREST.CloudClient.GetKafkaConsumerGroupLagSummary(lkc, consumerGroupId) + summary, err := kafkaREST.CloudClient.GetKafkaConsumerGroupLagSummary(consumerGroupId) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } table := output.NewTable(cmd) diff --git a/internal/cmd/kafka/command_consumergroup_list.go b/internal/cmd/kafka/command_consumergroup_list.go index 7279a3f078..4258fc641b 100644 --- a/internal/cmd/kafka/command_consumergroup_list.go +++ b/internal/cmd/kafka/command_consumergroup_list.go @@ -5,7 +5,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -33,14 +32,14 @@ func (c *consumerGroupCommand) newListCommand() *cobra.Command { } func (c *consumerGroupCommand) list(cmd *cobra.Command, _ []string) error { - kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(c.AuthenticatedCLICommand) + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - groupCmdResp, httpResp, err := kafkaREST.CloudClient.ListKafkaConsumerGroups(lkc) + groupCmdResp, err := kafkaREST.CloudClient.ListKafkaConsumerGroups() if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } list := output.NewList(cmd) diff --git a/internal/cmd/kafka/command_link_configuration_list.go b/internal/cmd/kafka/command_link_configuration_list.go index c91497f077..8beb7922cf 100644 --- a/internal/cmd/kafka/command_link_configuration_list.go +++ b/internal/cmd/kafka/command_link_configuration_list.go @@ -4,8 +4,6 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -39,43 +37,31 @@ func (c *linkCommand) configurationList(cmd *cobra.Command, args []string) error linkName := args[0] kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - listLinkConfigsRespData, httpResp, err := kafkaREST.CloudClient.ListKafkaLinkConfigs(cluster.ID, linkName) + configs, err := kafkaREST.CloudClient.ListKafkaLinkConfigs(linkName) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } list := output.NewList(cmd) - if len(listLinkConfigsRespData.Data) == 0 { - return list.Print() - } - list.Add(&linkConfigurationOut{ ConfigName: "dest.cluster.id", - ConfigValue: listLinkConfigsRespData.Data[0].ClusterId, + ConfigValue: kafkaREST.GetClusterId(), ReadOnly: true, Sensitive: true, }) - for _, config := range listLinkConfigsRespData.Data { + for _, config := range configs.GetData() { list.Add(&linkConfigurationOut{ - ConfigName: config.Name, - ConfigValue: config.Value, - ReadOnly: config.ReadOnly, - Sensitive: config.Sensitive, - Source: config.Source, - Synonyms: config.Synonyms, + ConfigName: config.GetName(), + ConfigValue: config.GetValue(), + ReadOnly: config.GetReadOnly(), + Sensitive: config.GetSensitive(), + Source: config.GetSource(), + Synonyms: config.GetSynonyms(), }) } return list.Print() diff --git a/internal/cmd/kafka/command_link_configuration_update.go b/internal/cmd/kafka/command_link_configuration_update.go index 57d1ac8636..7cebe37a09 100644 --- a/internal/cmd/kafka/command_link_configuration_update.go +++ b/internal/cmd/kafka/command_link_configuration_update.go @@ -6,7 +6,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" "github.com/confluentinc/cli/internal/pkg/properties" "github.com/confluentinc/cli/internal/pkg/resource" @@ -58,22 +57,14 @@ func (c *linkCommand) configurationUpdate(cmd *cobra.Command, args []string) err } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } data := toAlterConfigBatchRequestData(configMap) - if httpResp, err := kafkaREST.CloudClient.UpdateKafkaLinkConfigBatch(cluster.ID, linkName, data); err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + if err := kafkaREST.CloudClient.UpdateKafkaLinkConfigBatch(linkName, data); err != nil { + return err } output.Printf(errors.UpdatedResourceMsg, resource.ClusterLink, linkName) diff --git a/internal/cmd/kafka/command_link_create.go b/internal/cmd/kafka/command_link_create.go index 98ea656cac..d7f9208d74 100644 --- a/internal/cmd/kafka/command_link_create.go +++ b/internal/cmd/kafka/command_link_create.go @@ -10,7 +10,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" "github.com/confluentinc/cli/internal/pkg/properties" "github.com/confluentinc/cli/internal/pkg/resource" @@ -170,20 +169,12 @@ func (c *linkCommand) create(cmd *cobra.Command, args []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - if httpResp, err := kafkaREST.CloudClient.CreateKafkaLink(cluster.ID, linkName, !noValidate, dryRun, data); err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + if err := kafkaREST.CloudClient.CreateKafkaLink(linkName, !noValidate, dryRun, data); err != nil { + return err } msg := fmt.Sprintf(errors.CreatedLinkResourceMsg, resource.ClusterLink, linkName, linkConfigsCommandOutput(configMap)) diff --git a/internal/cmd/kafka/command_link_delete.go b/internal/cmd/kafka/command_link_delete.go index a2f159681f..8123c64d9b 100644 --- a/internal/cmd/kafka/command_link_delete.go +++ b/internal/cmd/kafka/command_link_delete.go @@ -8,7 +8,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/form" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" "github.com/confluentinc/cli/internal/pkg/resource" ) @@ -34,14 +33,6 @@ func (c *linkCommand) delete(cmd *cobra.Command, args []string) error { linkName := args[0] kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } @@ -51,8 +42,8 @@ func (c *linkCommand) delete(cmd *cobra.Command, args []string) error { return err } - if httpResp, err := kafkaREST.CloudClient.DeleteKafkaLink(cluster.ID, linkName); err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + if err := kafkaREST.CloudClient.DeleteKafkaLink(linkName); err != nil { + return err } output.Printf(errors.DeletedResourceMsg, resource.ClusterLink, linkName) diff --git a/internal/cmd/kafka/command_link_describe.go b/internal/cmd/kafka/command_link_describe.go index 140fbcec2c..3d6b960089 100644 --- a/internal/cmd/kafka/command_link_describe.go +++ b/internal/cmd/kafka/command_link_describe.go @@ -6,8 +6,6 @@ import ( kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -43,42 +41,34 @@ func (c *linkCommand) describe(cmd *cobra.Command, args []string) error { linkName := args[0] kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - data, httpResp, err := kafkaREST.CloudClient.GetKafkaLink(cluster.ID, linkName) + link, err := kafkaREST.CloudClient.GetKafkaLink(linkName) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } table := output.NewTable(cmd) - table.Add(newDescribeLink(data, "")) + table.Add(newDescribeLink(link, "")) table.Filter(getListFields(false)) return table.Print() } -func newDescribeLink(data kafkarestv3.ListLinksResponseData, topic string) *describeOut { +func newDescribeLink(link kafkarestv3.ListLinksResponseData, topic string) *describeOut { var linkError string - if data.GetLinkError() != "NO_ERROR" { - linkError = data.GetLinkError() + if link.GetLinkError() != "NO_ERROR" { + linkError = link.GetLinkError() } return &describeOut{ - Name: data.LinkName, + Name: link.GetLinkName(), TopicName: topic, - SourceClusterId: data.GetSourceClusterId(), - DestinationClusterId: data.GetDestinationClusterId(), - RemoteClusterId: data.GetRemoteClusterId(), - State: data.GetLinkState(), + SourceClusterId: link.GetSourceClusterId(), + DestinationClusterId: link.GetDestinationClusterId(), + RemoteClusterId: link.GetRemoteClusterId(), + State: link.GetLinkState(), Error: linkError, - ErrorMessage: data.GetLinkErrorMessage(), + ErrorMessage: link.GetLinkErrorMessage(), } } diff --git a/internal/cmd/kafka/command_link_list.go b/internal/cmd/kafka/command_link_list.go index 54fd2af289..4c347bfedf 100644 --- a/internal/cmd/kafka/command_link_list.go +++ b/internal/cmd/kafka/command_link_list.go @@ -6,14 +6,12 @@ import ( kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) const includeTopicsFlagName = "include-topics" -type link struct { +type listOut struct { Name string `human:"Name" serialized:"link_name"` TopicName string `human:"Topic Name" serialized:"topic_name"` SourceClusterId string `human:"Source Cluster" serialized:"source_cluster_id"` @@ -24,20 +22,20 @@ type link struct { ErrorMessage string `human:"Error Message" serialized:"error_message"` } -func newLink(data kafkarestv3.ListLinksResponseData, topic string) *link { +func newLink(link kafkarestv3.ListLinksResponseData, topic string) *listOut { var linkError string - if data.GetLinkError() != "NO_ERROR" { - linkError = data.GetLinkError() + if link.GetLinkError() != "NO_ERROR" { + linkError = link.GetLinkError() } - return &link{ - Name: data.LinkName, + return &listOut{ + Name: link.GetLinkName(), TopicName: topic, - SourceClusterId: data.GetSourceClusterId(), - DestinationClusterId: data.GetDestinationClusterId(), - RemoteClusterId: data.GetRemoteClusterId(), - State: data.GetLinkState(), + SourceClusterId: link.GetSourceClusterId(), + DestinationClusterId: link.GetDestinationClusterId(), + RemoteClusterId: link.GetRemoteClusterId(), + State: link.GetLinkState(), Error: linkError, - ErrorMessage: data.GetLinkErrorMessage(), + ErrorMessage: link.GetLinkErrorMessage(), } } @@ -66,31 +64,23 @@ func (c *linkCommand) list(cmd *cobra.Command, _ []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - listLinksRespDataList, httpResp, err := kafkaREST.CloudClient.ListKafkaLinks(cluster.ID) + links, err := kafkaREST.CloudClient.ListKafkaLinks() if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } list := output.NewList(cmd) - for _, data := range listLinksRespDataList.Data { + for _, link := range links.GetData() { if includeTopics { - for _, topic := range data.GetTopicNames() { - list.Add(newLink(data, topic)) + for _, topic := range link.GetTopicNames() { + list.Add(newLink(link, topic)) } } else { - list.Add(newLink(data, "")) + list.Add(newLink(link, "")) } } list.Filter(getListFields(includeTopics)) diff --git a/internal/cmd/kafka/command_link_list_onprem.go b/internal/cmd/kafka/command_link_list_onprem.go index b2ad5b6135..9095d41723 100644 --- a/internal/cmd/kafka/command_link_list_onprem.go +++ b/internal/cmd/kafka/command_link_list_onprem.go @@ -9,21 +9,21 @@ import ( "github.com/confluentinc/cli/internal/pkg/output" ) -func newLinkOnPrem(data kafkarestv3.ListLinksResponseData, topic string) *link { - l := &link{ +func newLinkOnPrem(data kafkarestv3.ListLinksResponseData, topic string) *listOut { + listEntry := &listOut{ Name: data.LinkName, TopicName: topic, } if data.SourceClusterId != nil { - l.SourceClusterId = *data.SourceClusterId + listEntry.SourceClusterId = *data.SourceClusterId } if data.DestinationClusterId != nil { - l.DestinationClusterId = *data.DestinationClusterId + listEntry.DestinationClusterId = *data.DestinationClusterId } - return l + return listEntry } func (c *linkCommand) newListCommandOnPrem() *cobra.Command { diff --git a/internal/cmd/kafka/command_mirror.go b/internal/cmd/kafka/command_mirror.go index ad92c9da69..b7223b4212 100644 --- a/internal/cmd/kafka/command_mirror.go +++ b/internal/cmd/kafka/command_mirror.go @@ -1,11 +1,8 @@ package kafka import ( - "github.com/antihax/optional" "github.com/spf13/cobra" - "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" - pcmd "github.com/confluentinc/cli/internal/pkg/cmd" ) @@ -78,24 +75,18 @@ func (c *mirrorCommand) autocompleteMirrorTopics(cmd *cobra.Command) []string { } kafkaREST, err := c.GetKafkaREST() - if err != nil || kafkaREST == nil { - return nil - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return nil } - opts := &kafkarestv3.ListKafkaMirrorTopicsUnderLinkOpts{MirrorStatus: optional.EmptyInterface()} - listMirrorTopicsResponseDataList, _, err := kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopicsUnderLink(kafkaREST.Context, cluster.ID, linkName, opts) + mirrors, err := kafkaREST.CloudClient.ListKafkaMirrorTopicsUnderLink(linkName, nil) if err != nil { return nil } - suggestions := make([]string, len(listMirrorTopicsResponseDataList.Data)) - for i, mirrorTopic := range listMirrorTopicsResponseDataList.Data { - suggestions[i] = mirrorTopic.MirrorTopicName + suggestions := make([]string, len(mirrors.GetData())) + for i, mirror := range mirrors.GetData() { + suggestions[i] = mirror.GetMirrorTopicName() } return suggestions } diff --git a/internal/cmd/kafka/command_mirror_create.go b/internal/cmd/kafka/command_mirror_create.go index a9923de6c0..250b57f7a9 100644 --- a/internal/cmd/kafka/command_mirror_create.go +++ b/internal/cmd/kafka/command_mirror_create.go @@ -8,7 +8,6 @@ import ( pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" "github.com/confluentinc/cli/internal/pkg/properties" "github.com/confluentinc/cli/internal/pkg/resource" @@ -85,14 +84,6 @@ func (c *mirrorCommand) create(cmd *cobra.Command, args []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } @@ -109,8 +100,8 @@ func (c *mirrorCommand) create(cmd *cobra.Command, args []string) error { createMirrorTopicRequestData.MirrorTopicName = &mirrorTopicName } - if httpResp, err := kafkaREST.CloudClient.CreateKafkaMirrorTopic(cluster.ID, linkName, createMirrorTopicRequestData); err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + if err := kafkaREST.CloudClient.CreateKafkaMirrorTopic(linkName, createMirrorTopicRequestData); err != nil { + return err } output.Printf(errors.CreatedResourceMsg, resource.MirrorTopic, mirrorTopicName) diff --git a/internal/cmd/kafka/command_mirror_describe.go b/internal/cmd/kafka/command_mirror_describe.go index 185a803903..46301c2298 100644 --- a/internal/cmd/kafka/command_mirror_describe.go +++ b/internal/cmd/kafka/command_mirror_describe.go @@ -4,9 +4,7 @@ import ( "github.com/spf13/cobra" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -45,34 +43,26 @@ func (c *mirrorCommand) describe(cmd *cobra.Command, args []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - mirror, httpResp, err := kafkaREST.Client.ClusterLinkingV3Api.ReadKafkaMirrorTopic(kafkaREST.Context, cluster.ID, linkName, mirrorTopicName) + mirror, err := kafkaREST.CloudClient.ReadKafkaMirrorTopic(linkName, mirrorTopicName) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } list := output.NewList(cmd) - for _, partitionLag := range mirror.MirrorLags { + for _, partitionLag := range mirror.GetMirrorLags().Items { list.Add(&mirrorOut{ - LinkName: mirror.LinkName, - MirrorTopicName: mirror.MirrorTopicName, - SourceTopicName: mirror.SourceTopicName, - MirrorStatus: string(mirror.MirrorStatus), - StatusTimeMs: mirror.StateTimeMs, - Partition: partitionLag.Partition, - PartitionMirrorLag: partitionLag.Lag, - LastSourceFetchOffset: partitionLag.LastSourceFetchOffset, + LinkName: mirror.GetLinkName(), + MirrorTopicName: mirror.GetMirrorTopicName(), + SourceTopicName: mirror.GetSourceTopicName(), + MirrorStatus: string(mirror.GetMirrorStatus()), + StatusTimeMs: mirror.GetStateTimeMs(), + Partition: partitionLag.GetPartition(), + PartitionMirrorLag: partitionLag.GetLag(), + LastSourceFetchOffset: partitionLag.GetLastSourceFetchOffset(), }) } list.Filter([]string{"LinkName", "MirrorTopicName", "Partition", "PartitionMirrorLag", "SourceTopicName", "MirrorStatus", "StatusTimeMs", "LastSourceFetchOffset"}) diff --git a/internal/cmd/kafka/command_mirror_failover.go b/internal/cmd/kafka/command_mirror_failover.go index 9fa1a4e819..f4446c8d69 100644 --- a/internal/cmd/kafka/command_mirror_failover.go +++ b/internal/cmd/kafka/command_mirror_failover.go @@ -1,15 +1,12 @@ package kafka import ( - "github.com/antihax/optional" "github.com/spf13/cobra" - "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" + kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" ) func (c *mirrorCommand) newFailoverCommand() *cobra.Command { @@ -51,26 +48,15 @@ func (c *mirrorCommand) failover(cmd *cobra.Command, args []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - failoverMirrorOpt := &kafkarestv3.UpdateKafkaMirrorTopicsFailoverOpts{ - AlterMirrorsRequestData: optional.NewInterface(kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: args}), - ValidateOnly: optional.NewBool(dryRun), - } + alterMirrorsRequestData := kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: &args} - results, httpResp, err := kafkaREST.Client.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsFailover(kafkaREST.Context, cluster.ID, linkName, failoverMirrorOpt) + results, err := kafkaREST.CloudClient.UpdateKafkaMirrorTopicsFailover(linkName, dryRun, alterMirrorsRequestData) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } return printAlterMirrorResult(cmd, results) diff --git a/internal/cmd/kafka/command_mirror_list.go b/internal/cmd/kafka/command_mirror_list.go index 917d037e97..9d6ebce51e 100644 --- a/internal/cmd/kafka/command_mirror_list.go +++ b/internal/cmd/kafka/command_mirror_list.go @@ -1,20 +1,21 @@ package kafka import ( - "net/http" + "fmt" + "strings" - "github.com/antihax/optional" "github.com/spf13/cobra" - "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" + kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" + "github.com/confluentinc/cli/internal/pkg/utils" ) +var allowedMirrorTopicStatusValues = []string{"active", "failed", "paused", "stopped", "pending_stopped"} + func (c *mirrorCommand) newListCommand() *cobra.Command { cmd := &cobra.Command{ Use: "list", @@ -34,7 +35,7 @@ func (c *mirrorCommand) newListCommand() *cobra.Command { } pcmd.AddLinkFlag(cmd, c.AuthenticatedCLICommand) - cmd.Flags().String(mirrorStatusFlagName, "", "Mirror topic status. Can be one of [active, failed, paused, stopped, pending_stopped]. If not specified, list all mirror topics.") + cmd.Flags().String(mirrorStatusFlagName, "", fmt.Sprintf("Mirror topic status. Can be one of %s. If not specified, list all mirror topics.", utils.ArrayToCommaDelimitedString(allowedMirrorTopicStatusValues, "or"))) pcmd.AddClusterFlag(cmd, c.AuthenticatedCLICommand) pcmd.AddContextFlag(cmd, c.CLICommand) pcmd.AddEnvironmentFlag(cmd, c.AuthenticatedCLICommand) @@ -55,53 +56,47 @@ func (c *mirrorCommand) list(cmd *cobra.Command, _ []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - mirrorStatusOpt := optional.EmptyInterface() + var mirrorTopicStatus *kafkarestv3.MirrorTopicStatus if mirrorStatus != "" { - mirrorStatusOpt = optional.NewInterface(kafkarestv3.MirrorTopicStatus(mirrorStatus)) + mirrorTopicStatus, err = kafkarestv3.NewMirrorTopicStatusFromValue(strings.ToUpper(mirrorStatus)) + if err != nil { + return err + } } - var listMirrorTopicsResponseDataList kafkarestv3.ListMirrorTopicsResponseDataList - var httpResp *http.Response - + var mirrors kafkarestv3.ListMirrorTopicsResponseDataList if linkName == "" { - opts := &kafkarestv3.ListKafkaMirrorTopicsOpts{MirrorStatus: mirrorStatusOpt} - listMirrorTopicsResponseDataList, httpResp, err = kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopics(kafkaREST.Context, cluster.ID, opts) + mirrors, err = kafkaREST.CloudClient.ListKafkaMirrorTopics(mirrorTopicStatus) + if err != nil { + return err + } } else { - opts := &kafkarestv3.ListKafkaMirrorTopicsUnderLinkOpts{MirrorStatus: mirrorStatusOpt} - listMirrorTopicsResponseDataList, httpResp, err = kafkaREST.Client.ClusterLinkingV3Api.ListKafkaMirrorTopicsUnderLink(kafkaREST.Context, cluster.ID, linkName, opts) - } - if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + mirrors, err = kafkaREST.CloudClient.ListKafkaMirrorTopicsUnderLink(linkName, mirrorTopicStatus) + if err != nil { + return err + } } list := output.NewList(cmd) - for _, mirror := range listMirrorTopicsResponseDataList.Data { + for _, mirror := range mirrors.GetData() { var maxLag int64 = 0 - for _, mirrorLag := range mirror.MirrorLags { - if mirrorLag.Lag > maxLag { - maxLag = mirrorLag.Lag + for _, mirrorLag := range mirror.GetMirrorLags().Items { + if lag := mirrorLag.GetLag(); lag > maxLag { + maxLag = lag } } list.Add(&mirrorOut{ - LinkName: mirror.LinkName, - MirrorTopicName: mirror.MirrorTopicName, - SourceTopicName: mirror.SourceTopicName, - MirrorStatus: string(mirror.MirrorStatus), - StatusTimeMs: mirror.StateTimeMs, - NumPartition: mirror.NumPartitions, + LinkName: mirror.GetLinkName(), + MirrorTopicName: mirror.GetMirrorTopicName(), + SourceTopicName: mirror.GetSourceTopicName(), + MirrorStatus: string(mirror.GetMirrorStatus()), + StatusTimeMs: mirror.GetStateTimeMs(), + NumPartition: mirror.GetNumPartitions(), MaxPerPartitionMirrorLag: maxLag, }) } diff --git a/internal/cmd/kafka/command_mirror_pause.go b/internal/cmd/kafka/command_mirror_pause.go index 7fe3cbc313..21d7ba22a7 100644 --- a/internal/cmd/kafka/command_mirror_pause.go +++ b/internal/cmd/kafka/command_mirror_pause.go @@ -1,15 +1,12 @@ package kafka import ( - "github.com/antihax/optional" "github.com/spf13/cobra" - "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" + kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" ) func (c *mirrorCommand) newPauseCommand() *cobra.Command { @@ -51,26 +48,15 @@ func (c *mirrorCommand) pause(cmd *cobra.Command, args []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - pauseMirrorOpt := &kafkarestv3.UpdateKafkaMirrorTopicsPauseOpts{ - AlterMirrorsRequestData: optional.NewInterface(kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: args}), - ValidateOnly: optional.NewBool(dryRun), - } + alterMirrorsRequestData := kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: &args} - results, httpResp, err := kafkaREST.Client.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsPause(kafkaREST.Context, cluster.ID, linkName, pauseMirrorOpt) + results, err := kafkaREST.CloudClient.UpdateKafkaMirrorTopicsPause(linkName, dryRun, alterMirrorsRequestData) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } return printAlterMirrorResult(cmd, results) diff --git a/internal/cmd/kafka/command_mirror_promote.go b/internal/cmd/kafka/command_mirror_promote.go index bf09405161..8527b32bcf 100644 --- a/internal/cmd/kafka/command_mirror_promote.go +++ b/internal/cmd/kafka/command_mirror_promote.go @@ -1,15 +1,12 @@ package kafka import ( - "github.com/antihax/optional" "github.com/spf13/cobra" - "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" + kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" ) func (c *mirrorCommand) newPromoteCommand() *cobra.Command { @@ -51,26 +48,15 @@ func (c *mirrorCommand) promote(cmd *cobra.Command, args []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - promoteMirrorOpt := &kafkarestv3.UpdateKafkaMirrorTopicsPromoteOpts{ - AlterMirrorsRequestData: optional.NewInterface(kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: args}), - ValidateOnly: optional.NewBool(dryRun), - } + alterMirrorsRequestData := kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: &args} - results, httpResp, err := kafkaREST.Client.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsPromote(kafkaREST.Context, cluster.ID, linkName, promoteMirrorOpt) + results, err := kafkaREST.CloudClient.UpdateKafkaMirrorTopicsPromote(linkName, dryRun, alterMirrorsRequestData) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } return printAlterMirrorResult(cmd, results) diff --git a/internal/cmd/kafka/command_mirror_resume.go b/internal/cmd/kafka/command_mirror_resume.go index ec6ad8140a..4dfd59ae51 100644 --- a/internal/cmd/kafka/command_mirror_resume.go +++ b/internal/cmd/kafka/command_mirror_resume.go @@ -1,17 +1,14 @@ package kafka import ( - "fmt" + "strconv" - "github.com/antihax/optional" "github.com/spf13/cobra" - "github.com/confluentinc/kafka-rest-sdk-go/kafkarestv3" + kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/examples" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -54,26 +51,15 @@ func (c *mirrorCommand) resume(cmd *cobra.Command, args []string) error { } kafkaREST, err := c.GetKafkaREST() - if kafkaREST == nil { - if err != nil { - return err - } - return errors.New(errors.RestProxyNotAvailableMsg) - } - - cluster, err := c.Context.GetKafkaClusterForCommand() if err != nil { return err } - resumeMirrorOpt := &kafkarestv3.UpdateKafkaMirrorTopicsResumeOpts{ - AlterMirrorsRequestData: optional.NewInterface(kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: args}), - ValidateOnly: optional.NewBool(dryRun), - } + alterMirrorsRequestData := kafkarestv3.AlterMirrorsRequestData{MirrorTopicNames: &args} - results, httpResp, err := kafkaREST.Client.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsResume(kafkaREST.Context, cluster.ID, linkName, resumeMirrorOpt) + results, err := kafkaREST.CloudClient.UpdateKafkaMirrorTopicsResume(linkName, dryRun, alterMirrorsRequestData) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } return printAlterMirrorResult(cmd, results) @@ -81,21 +67,18 @@ func (c *mirrorCommand) resume(cmd *cobra.Command, args []string) error { func printAlterMirrorResult(cmd *cobra.Command, results kafkarestv3.AlterMirrorStatusResponseDataList) error { list := output.NewList(cmd) - for _, result := range results.Data { - var errorMessage string - if result.ErrorMessage != nil { - errorMessage = *result.ErrorMessage - } + for _, result := range results.GetData() { + errorMessage := result.GetErrorMessage() var errorCode string - if result.ErrorCode != nil { - errorCode = fmt.Sprint(*result.ErrorCode) + if code := result.GetErrorCode(); code != 0 { + errorCode = strconv.Itoa(int(code)) } // fatal error if errorMessage != "" { list.Add(&mirrorOut{ - MirrorTopicName: result.MirrorTopicName, + MirrorTopicName: result.GetMirrorTopicName(), Partition: -1, ErrorMessage: errorMessage, ErrorCode: errorCode, @@ -105,14 +88,14 @@ func printAlterMirrorResult(cmd *cobra.Command, results kafkarestv3.AlterMirrorS continue } - for _, partitionLag := range result.MirrorLags { + for _, partitionLag := range result.GetMirrorLags().Items { list.Add(&mirrorOut{ - MirrorTopicName: result.MirrorTopicName, - Partition: partitionLag.Partition, + MirrorTopicName: result.GetMirrorTopicName(), + Partition: partitionLag.GetPartition(), ErrorMessage: errorMessage, ErrorCode: errorCode, - PartitionMirrorLag: partitionLag.Lag, - LastSourceFetchOffset: partitionLag.LastSourceFetchOffset, + PartitionMirrorLag: partitionLag.GetLag(), + LastSourceFetchOffset: partitionLag.GetLastSourceFetchOffset(), }) } } diff --git a/internal/cmd/kafka/command_topic_create.go b/internal/cmd/kafka/command_topic_create.go index e4a1df1b75..7376f1ee13 100644 --- a/internal/cmd/kafka/command_topic_create.go +++ b/internal/cmd/kafka/command_topic_create.go @@ -73,17 +73,12 @@ func (c *command) create(cmd *cobra.Command, args []string) error { return err } - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { - return err - } - - kafkaREST, err := c.GetKafkaREST() - if err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return err } @@ -108,7 +103,7 @@ func (c *command) create(cmd *cobra.Command, args []string) error { data.PartitionsCount = utils.Int32Ptr(int32(partitions)) } - _, httpResp, err := kafkaREST.CloudClient.CreateKafkaTopic(kafkaClusterConfig.ID, data) + _, httpResp, err := kafkaREST.CloudClient.CreateKafkaTopic(data) if err != nil { restErr, parseErr := kafkarest.ParseOpenAPIErrorCloud(err) if parseErr == nil && restErr.Code == ccloudv2.BadRequestErrorCode { @@ -117,9 +112,10 @@ func (c *command) create(cmd *cobra.Command, args []string) error { if ifNotExists { return nil } + clusterId := kafkaREST.GetClusterId() return errors.NewErrorWithSuggestions( - fmt.Sprintf(errors.TopicExistsErrorMsg, topicName, kafkaClusterConfig.ID), - fmt.Sprintf(errors.TopicExistsSuggestions, kafkaClusterConfig.ID, kafkaClusterConfig.ID)) + fmt.Sprintf(errors.TopicExistsErrorMsg, topicName, clusterId), + fmt.Sprintf(errors.TopicExistsSuggestions, clusterId, clusterId)) } // Print partition limit error w/ suggestion diff --git a/internal/cmd/kafka/command_topic_delete.go b/internal/cmd/kafka/command_topic_delete.go index ba02c29a30..c6fb18532c 100644 --- a/internal/cmd/kafka/command_topic_delete.go +++ b/internal/cmd/kafka/command_topic_delete.go @@ -42,22 +42,17 @@ func (c *command) newDeleteCommand() *cobra.Command { func (c *command) delete(cmd *cobra.Command, args []string) error { topicName := args[0] - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { - return err - } - - kafkaREST, err := c.GetKafkaREST() - if err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return err } // Check if topic exists - if _, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(kafkaClusterConfig.ID, topicName); err != nil { + if _, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(topicName); err != nil { return err } @@ -66,7 +61,7 @@ func (c *command) delete(cmd *cobra.Command, args []string) error { return err } - httpResp, err := kafkaREST.CloudClient.DeleteKafkaTopic(kafkaClusterConfig.ID, topicName) + httpResp, err := kafkaREST.CloudClient.DeleteKafkaTopic(topicName) if err != nil { restErr, parseErr := kafkarest.ParseOpenAPIErrorCloud(err) if parseErr == nil { diff --git a/internal/cmd/kafka/command_topic_describe.go b/internal/cmd/kafka/command_topic_describe.go index 0fdf89a404..306a0979f5 100644 --- a/internal/cmd/kafka/command_topic_describe.go +++ b/internal/cmd/kafka/command_topic_describe.go @@ -39,26 +39,21 @@ func (c *command) newDescribeCommand() *cobra.Command { func (c *command) describe(cmd *cobra.Command, args []string) error { topicName := args[0] - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { - return err - } - - kafkaREST, err := c.GetKafkaREST() - if err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return err } - configsResp, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(kafkaClusterConfig.ID, topicName) + configsResp, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(topicName) if err != nil { return err } - topic, httpResp, err := kafkaREST.CloudClient.GetKafkaTopic(kafkaClusterConfig.ID, topicName) + topic, httpResp, err := kafkaREST.CloudClient.GetKafkaTopic(topicName) if err != nil { if restErr, parseErr := kafkarest.ParseOpenAPIErrorCloud(err); parseErr == nil && restErr.Code == ccloudv2.UnknownTopicOrPartitionErrorCode { return fmt.Errorf(errors.UnknownTopicErrorMsg, topicName) diff --git a/internal/cmd/kafka/command_topic_list.go b/internal/cmd/kafka/command_topic_list.go index 8220e6a294..d7e55b499c 100644 --- a/internal/cmd/kafka/command_topic_list.go +++ b/internal/cmd/kafka/command_topic_list.go @@ -6,7 +6,6 @@ import ( kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" ) @@ -45,24 +44,19 @@ func (c *command) list(cmd *cobra.Command, _ []string) error { } func (c *command) getTopics() ([]kafkarestv3.TopicData, error) { - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return nil, err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return nil, err } - kafkaREST, err := c.GetKafkaREST() + topics, err := kafkaREST.CloudClient.ListKafkaTopics() if err != nil { return nil, err } - topics, httpResp, err := kafkaREST.CloudClient.ListKafkaTopics(kafkaClusterConfig.ID) - if err != nil { - return nil, kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) - } - return topics.Data, nil } diff --git a/internal/cmd/kafka/command_topic_update.go b/internal/cmd/kafka/command_topic_update.go index 4fe05645c0..68ce5f7403 100644 --- a/internal/cmd/kafka/command_topic_update.go +++ b/internal/cmd/kafka/command_topic_update.go @@ -74,17 +74,12 @@ func (c *command) update(cmd *cobra.Command, args []string) error { return err } - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() + kafkaREST, err := c.GetKafkaREST() if err != nil { return err } - if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil { - return err - } - - kafkaREST, err := c.GetKafkaREST() - if err != nil { + if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil { return err } @@ -97,7 +92,7 @@ func (c *command) update(cmd *cobra.Command, args []string) error { data := toAlterConfigBatchRequestData(configMap) data.ValidateOnly = &dryRun - httpResp, err := kafkaREST.CloudClient.UpdateKafkaTopicConfigBatch(kafkaClusterConfig.ID, topicName, data) + httpResp, err := kafkaREST.CloudClient.UpdateKafkaTopicConfigBatch(topicName, data) if err != nil { restErr, parseErr := kafkarest.ParseOpenAPIErrorCloud(err) if parseErr == nil { @@ -121,16 +116,16 @@ func (c *command) update(cmd *cobra.Command, args []string) error { if err != nil { return err } - updateResp, httpResp, err := kafkaREST.CloudClient.UpdateKafkaTopicPartitionCount(kafkaClusterConfig.ID, topicName, kafkarestv3.UpdatePartitionCountRequestData{PartitionsCount: int32(updateNumPartitionsInt)}) + updateResp, err := kafkaREST.CloudClient.UpdateKafkaTopicPartitionCount(topicName, kafkarestv3.UpdatePartitionCountRequestData{PartitionsCount: int32(updateNumPartitionsInt)}) if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) + return err } configsValues[numPartitionsKey] = fmt.Sprint(updateResp.PartitionsCount) partitionsKafkaRestConfig := kafkarestv3.AlterConfigBatchRequestDataData{Name: numPartitionsKey} kafkaRestConfigs.Data = append(kafkaRestConfigs.Data, partitionsKafkaRestConfig) } - configsResp, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(kafkaClusterConfig.ID, topicName) + configsResp, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(topicName) if err != nil { return err } diff --git a/internal/cmd/kafka/utils.go b/internal/cmd/kafka/utils.go index bb81688d4b..e6df58bb67 100644 --- a/internal/cmd/kafka/utils.go +++ b/internal/cmd/kafka/utils.go @@ -9,7 +9,6 @@ import ( "github.com/confluentinc/cli/internal/pkg/ccloudv2" "github.com/confluentinc/cli/internal/pkg/ccstructs" - pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/errors" "github.com/confluentinc/cli/internal/pkg/kafkarest" ) @@ -82,22 +81,6 @@ func handleOpenApiError(httpResp *_nethttp.Response, err error, client *cpkafkar return err } -func getKafkaRestProxyAndLkcId(c *pcmd.AuthenticatedCLICommand) (*pcmd.KafkaREST, string, error) { - kafkaREST, err := c.GetKafkaREST() - if err != nil { - return nil, "", err - } - if kafkaREST == nil { - return nil, "", errors.New(errors.RestProxyNotAvailable) - } - // Kafka REST is available - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() - if err != nil { - return nil, "", err - } - return kafkaREST, kafkaClusterConfig.ID, nil -} - func isClusterResizeInProgress(currentCluster *cmkv2.CmkV2Cluster) error { if currentCluster.Status.Phase == ccloudv2.StatusProvisioning { return errors.New(errors.KafkaClusterStillProvisioningErrorMsg) diff --git a/internal/cmd/ksql/command_cluster_configureacls.go b/internal/cmd/ksql/command_cluster_configureacls.go index 794abdd953..451648d3b6 100644 --- a/internal/cmd/ksql/command_cluster_configureacls.go +++ b/internal/cmd/ksql/command_cluster_configureacls.go @@ -13,7 +13,6 @@ import ( "github.com/confluentinc/cli/internal/pkg/ccstructs" pcmd "github.com/confluentinc/cli/internal/pkg/cmd" "github.com/confluentinc/cli/internal/pkg/errors" - "github.com/confluentinc/cli/internal/pkg/kafkarest" "github.com/confluentinc/cli/internal/pkg/output" "github.com/confluentinc/cli/internal/pkg/resource" ) @@ -82,16 +81,10 @@ func (c *ksqlCommand) configureACLs(cmd *cobra.Command, args []string) error { return err } - kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand() - if err != nil { + if err := kafkaREST.CloudClient.BatchCreateKafkaAcls(getCreateAclRequestDataList(bindings)); err != nil { return err } - httpResp, err := kafkaREST.CloudClient.BatchCreateKafkaAcls(kafkaClusterConfig.ID, getCreateAclRequestDataList(bindings)) - if err != nil { - return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp) - } - return acl.PrintACLs(cmd, bindings) } diff --git a/internal/pkg/ccloudv2/kafkarest.go b/internal/pkg/ccloudv2/kafkarest.go index b8b2c2d8c8..2b2393f85a 100644 --- a/internal/pkg/ccloudv2/kafkarest.go +++ b/internal/pkg/ccloudv2/kafkarest.go @@ -20,9 +20,10 @@ const ( type KafkaRestClient struct { *kafkarestv3.APIClient AuthToken string + ClusterId string } -func NewKafkaRestClient(url, userAgent string, unsafeTrace bool, authToken string) *KafkaRestClient { +func NewKafkaRestClient(url, clusterId, userAgent, authToken string, unsafeTrace bool) *KafkaRestClient { cfg := kafkarestv3.NewConfiguration() cfg.Debug = unsafeTrace cfg.HTTPClient = NewRetryableHttpClient(unsafeTrace) @@ -32,6 +33,7 @@ func NewKafkaRestClient(url, userAgent string, unsafeTrace bool, authToken strin return &KafkaRestClient{ APIClient: kafkarestv3.NewAPIClient(cfg), AuthToken: authToken, + ClusterId: clusterId, } } @@ -43,31 +45,33 @@ func (c *KafkaRestClient) context() context.Context { return context.WithValue(context.Background(), kafkarestv3.ContextAccessToken, c.AuthToken) } -func (c *KafkaRestClient) GetKafkaClusterConfig(clusterId, name string) (kafkarestv3.ClusterConfigData, error) { - res, httpResp, err := c.ConfigsV3Api.GetKafkaClusterConfig(c.context(), clusterId, name).Execute() +func (c *KafkaRestClient) GetKafkaClusterConfig(name string) (kafkarestv3.ClusterConfigData, error) { + res, httpResp, err := c.ConfigsV3Api.GetKafkaClusterConfig(c.context(), c.ClusterId, name).Execute() return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaClusterConfigs(clusterId string) (kafkarestv3.ClusterConfigDataList, error) { - res, httpResp, err := c.ConfigsV3Api.ListKafkaClusterConfigs(c.context(), clusterId).Execute() +func (c *KafkaRestClient) ListKafkaClusterConfigs() (kafkarestv3.ClusterConfigDataList, error) { + res, httpResp, err := c.ConfigsV3Api.ListKafkaClusterConfigs(c.context(), c.ClusterId).Execute() return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) UpdateKafkaClusterConfigs(clusterId string, req kafkarestv3.AlterConfigBatchRequestData) error { - httpResp, err := c.ConfigsV3Api.UpdateKafkaClusterConfigs(c.context(), clusterId).AlterConfigBatchRequestData(req).Execute() +func (c *KafkaRestClient) UpdateKafkaClusterConfigs(req kafkarestv3.AlterConfigBatchRequestData) error { + httpResp, err := c.ConfigsV3Api.UpdateKafkaClusterConfigs(c.context(), c.ClusterId).AlterConfigBatchRequestData(req).Execute() return kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) BatchCreateKafkaAcls(clusterId string, list kafkarestv3.CreateAclRequestDataList) (*http.Response, error) { - return c.ACLV3Api.BatchCreateKafkaAcls(c.context(), clusterId).CreateAclRequestDataList(list).Execute() +func (c *KafkaRestClient) BatchCreateKafkaAcls(list kafkarestv3.CreateAclRequestDataList) error { + httpResp, err := c.ACLV3Api.BatchCreateKafkaAcls(c.context(), c.ClusterId).CreateAclRequestDataList(list).Execute() + return kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) CreateKafkaAcls(clusterId string, data kafkarestv3.CreateAclRequestData) (*http.Response, error) { - return c.ACLV3Api.CreateKafkaAcls(c.context(), clusterId).CreateAclRequestData(data).Execute() +func (c *KafkaRestClient) CreateKafkaAcls(data kafkarestv3.CreateAclRequestData) error { + httpResp, err := c.ACLV3Api.CreateKafkaAcls(c.context(), c.ClusterId).CreateAclRequestData(data).Execute() + return kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) GetKafkaAcls(clusterId string, acl *ccstructs.ACLBinding) (kafkarestv3.AclDataList, *http.Response, error) { - req := c.ACLV3Api.GetKafkaAcls(c.context(), clusterId).Host(acl.GetEntry().GetHost()).Principal(acl.GetEntry().GetPrincipal()).ResourceName(acl.GetPattern().GetName()) +func (c *KafkaRestClient) GetKafkaAcls(acl *ccstructs.ACLBinding) (kafkarestv3.AclDataList, error) { + req := c.ACLV3Api.GetKafkaAcls(c.context(), c.ClusterId).Host(acl.GetEntry().GetHost()).Principal(acl.GetEntry().GetPrincipal()).ResourceName(acl.GetPattern().GetName()) if acl.GetEntry().GetOperation() != ccstructs.ACLOperations_UNKNOWN { req = req.Operation(acl.GetEntry().GetOperation().String()) @@ -85,11 +89,12 @@ func (c *KafkaRestClient) GetKafkaAcls(clusterId string, acl *ccstructs.ACLBindi req = req.ResourceType(kafkarestv3.AclResourceType(acl.GetPattern().GetResourceType().String())) } - return req.Execute() + res, httpResp, err := req.Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) DeleteKafkaAcls(clusterId string, acl *ccstructs.ACLFilter) (kafkarestv3.InlineResponse200, *http.Response, error) { - req := c.ACLV3Api.DeleteKafkaAcls(c.context(), clusterId).Host(acl.EntryFilter.GetHost()).Principal(acl.EntryFilter.GetPrincipal()).ResourceName(acl.PatternFilter.GetName()) +func (c *KafkaRestClient) DeleteKafkaAcls(acl *ccstructs.ACLFilter) (kafkarestv3.InlineResponse200, error) { + req := c.ACLV3Api.DeleteKafkaAcls(c.context(), c.ClusterId).Host(acl.EntryFilter.GetHost()).Principal(acl.EntryFilter.GetPrincipal()).ResourceName(acl.PatternFilter.GetName()) if acl.EntryFilter.GetOperation() != ccstructs.ACLOperations_UNKNOWN { req = req.Operation(acl.EntryFilter.GetOperation().String()) @@ -107,39 +112,94 @@ func (c *KafkaRestClient) DeleteKafkaAcls(clusterId string, acl *ccstructs.ACLFi req = req.ResourceType(kafkarestv3.AclResourceType(acl.PatternFilter.GetResourceType().String())) } - return req.Execute() + res, httpResp, err := req.Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) +} + +func (c *KafkaRestClient) CreateKafkaLink(linkName string, validateLink, validateOnly bool, data kafkarestv3.CreateLinkRequestData) error { + httpResp, err := c.ClusterLinkingV3Api.CreateKafkaLink(c.context(), c.ClusterId).LinkName(linkName).ValidateLink(validateLink).ValidateOnly(validateOnly).CreateLinkRequestData(data).Execute() + return kafkarest.NewError(c.GetUrl(), err, httpResp) +} + +func (c *KafkaRestClient) CreateKafkaMirrorTopic(linkName string, data kafkarestv3.CreateMirrorTopicRequestData) error { + httpResp, err := c.ClusterLinkingV3Api.CreateKafkaMirrorTopic(c.context(), c.ClusterId, linkName).CreateMirrorTopicRequestData(data).Execute() + return kafkarest.NewError(c.GetUrl(), err, httpResp) +} + +func (c *KafkaRestClient) ListKafkaMirrorTopics(status *kafkarestv3.MirrorTopicStatus) (kafkarestv3.ListMirrorTopicsResponseDataList, error) { + req := c.ClusterLinkingV3Api.ListKafkaMirrorTopics(c.context(), c.ClusterId) + + if status != nil { + req = req.MirrorStatus(*status) + } + + res, httpResp, err := req.Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) +} + +func (c *KafkaRestClient) ListKafkaMirrorTopicsUnderLink(linkName string, status *kafkarestv3.MirrorTopicStatus) (kafkarestv3.ListMirrorTopicsResponseDataList, error) { + req := c.ClusterLinkingV3Api.ListKafkaMirrorTopicsUnderLink(c.context(), c.ClusterId, linkName) + + if status != nil { + req = req.MirrorStatus(*status) + } + + res, httpResp, err := req.Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) +} + +func (c *KafkaRestClient) ReadKafkaMirrorTopic(linkName, mirrorTopicName string) (kafkarestv3.ListMirrorTopicsResponseData, error) { + res, httpResp, err := c.ClusterLinkingV3Api.ReadKafkaMirrorTopic(c.context(), c.ClusterId, linkName, mirrorTopicName).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) +} + +func (c *KafkaRestClient) UpdateKafkaMirrorTopicsFailover(linkName string, validateOnly bool, data kafkarestv3.AlterMirrorsRequestData) (kafkarestv3.AlterMirrorStatusResponseDataList, error) { + res, httpResp, err := c.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsFailover(c.context(), c.ClusterId, linkName).ValidateOnly(validateOnly).AlterMirrorsRequestData(data).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) CreateKafkaLink(clusterId, linkName string, validateLink, validateOnly bool, data kafkarestv3.CreateLinkRequestData) (*http.Response, error) { - return c.ClusterLinkingV3Api.CreateKafkaLink(c.context(), clusterId).LinkName(linkName).ValidateLink(validateLink).ValidateOnly(validateOnly).CreateLinkRequestData(data).Execute() +func (c *KafkaRestClient) UpdateKafkaMirrorTopicsPause(linkName string, validateOnly bool, data kafkarestv3.AlterMirrorsRequestData) (kafkarestv3.AlterMirrorStatusResponseDataList, error) { + res, httpResp, err := c.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsPause(c.context(), c.ClusterId, linkName).ValidateOnly(validateOnly).AlterMirrorsRequestData(data).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) CreateKafkaMirrorTopic(clusterId, linkName string, data kafkarestv3.CreateMirrorTopicRequestData) (*http.Response, error) { - return c.ClusterLinkingV3Api.CreateKafkaMirrorTopic(c.context(), clusterId, linkName).CreateMirrorTopicRequestData(data).Execute() +func (c *KafkaRestClient) UpdateKafkaMirrorTopicsPromote(linkName string, validateOnly bool, data kafkarestv3.AlterMirrorsRequestData) (kafkarestv3.AlterMirrorStatusResponseDataList, error) { + res, httpResp, err := c.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsPromote(c.context(), c.ClusterId, linkName).ValidateOnly(validateOnly).AlterMirrorsRequestData(data).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) DeleteKafkaLink(clusterId, linkName string) (*http.Response, error) { - return c.ClusterLinkingV3Api.DeleteKafkaLink(c.context(), clusterId, linkName).Execute() +func (c *KafkaRestClient) UpdateKafkaMirrorTopicsResume(linkName string, validateOnly bool, data kafkarestv3.AlterMirrorsRequestData) (kafkarestv3.AlterMirrorStatusResponseDataList, error) { + res, httpResp, err := c.ClusterLinkingV3Api.UpdateKafkaMirrorTopicsResume(c.context(), c.ClusterId, linkName).ValidateOnly(validateOnly).AlterMirrorsRequestData(data).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) GetKafkaLink(clusterId, linkName string) (kafkarestv3.ListLinksResponseData, *http.Response, error) { - return c.ClusterLinkingV3Api.GetKafkaLink(c.context(), clusterId, linkName).Execute() +func (c *KafkaRestClient) DeleteKafkaLink(linkName string) error { + httpResp, err := c.ClusterLinkingV3Api.DeleteKafkaLink(c.context(), c.ClusterId, linkName).Execute() + return kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaLinkConfigs(clusterId, linkName string) (kafkarestv3.ListLinkConfigsResponseDataList, *http.Response, error) { - return c.ClusterLinkingV3Api.ListKafkaLinkConfigs(c.context(), clusterId, linkName).Execute() +func (c *KafkaRestClient) GetKafkaLink(linkName string) (kafkarestv3.ListLinksResponseData, error) { + res, httpResp, err := c.ClusterLinkingV3Api.GetKafkaLink(c.context(), c.ClusterId, linkName).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaLinks(clusterId string) (kafkarestv3.ListLinksResponseDataList, *http.Response, error) { - return c.ClusterLinkingV3Api.ListKafkaLinks(c.context(), clusterId).Execute() +func (c *KafkaRestClient) ListKafkaLinkConfigs(linkName string) (kafkarestv3.ListLinkConfigsResponseDataList, error) { + res, httpResp, err := c.ClusterLinkingV3Api.ListKafkaLinkConfigs(c.context(), c.ClusterId, linkName).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) UpdateKafkaLinkConfigBatch(clusterId, linkName string, data kafkarestv3.AlterConfigBatchRequestData) (*http.Response, error) { - return c.ClusterLinkingV3Api.UpdateKafkaLinkConfigBatch(c.context(), clusterId, linkName).AlterConfigBatchRequestData(data).Execute() +func (c *KafkaRestClient) ListKafkaLinks() (kafkarestv3.ListLinksResponseDataList, error) { + res, httpResp, err := c.ClusterLinkingV3Api.ListKafkaLinks(c.context(), c.ClusterId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaTopicConfigs(clusterId, topicName string) (kafkarestv3.TopicConfigDataList, error) { - res, httpResp, err := c.ConfigsV3Api.ListKafkaTopicConfigs(c.context(), clusterId, topicName).Execute() +func (c *KafkaRestClient) UpdateKafkaLinkConfigBatch(linkName string, data kafkarestv3.AlterConfigBatchRequestData) error { + httpResp, err := c.ClusterLinkingV3Api.UpdateKafkaLinkConfigBatch(c.context(), c.ClusterId, linkName).AlterConfigBatchRequestData(data).Execute() + return kafkarest.NewError(c.GetUrl(), err, httpResp) +} + +func (c *KafkaRestClient) ListKafkaTopicConfigs(topicName string) (kafkarestv3.TopicConfigDataList, error) { + res, httpResp, err := c.ConfigsV3Api.ListKafkaTopicConfigs(c.context(), c.ClusterId, topicName).Execute() if err != nil { if restErr, err := kafkarest.ParseOpenAPIErrorCloud(err); err == nil { if restErr.Code == UnknownTopicOrPartitionErrorCode { @@ -150,54 +210,62 @@ func (c *KafkaRestClient) ListKafkaTopicConfigs(clusterId, topicName string) (ka return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) UpdateKafkaTopicConfigBatch(clusterId, topicName string, data kafkarestv3.AlterConfigBatchRequestData) (*http.Response, error) { - return c.ConfigsV3Api.UpdateKafkaTopicConfigBatch(c.context(), clusterId, topicName).AlterConfigBatchRequestData(data).Execute() +func (c *KafkaRestClient) UpdateKafkaTopicConfigBatch(topicName string, data kafkarestv3.AlterConfigBatchRequestData) (*http.Response, error) { + return c.ConfigsV3Api.UpdateKafkaTopicConfigBatch(c.context(), c.ClusterId, topicName).AlterConfigBatchRequestData(data).Execute() } -func (c *KafkaRestClient) GetKafkaConsumerGroup(clusterId, consumerGroupId string) (kafkarestv3.ConsumerGroupData, *http.Response, error) { - return c.ConsumerGroupV3Api.GetKafkaConsumerGroup(c.context(), clusterId, consumerGroupId).Execute() +func (c *KafkaRestClient) GetKafkaConsumerGroup(consumerGroupId string) (kafkarestv3.ConsumerGroupData, error) { + res, httpResp, err := c.ConsumerGroupV3Api.GetKafkaConsumerGroup(c.context(), c.ClusterId, consumerGroupId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) GetKafkaConsumerGroupLagSummary(clusterId, consumerGroupId string) (kafkarestv3.ConsumerGroupLagSummaryData, *http.Response, error) { - return c.ConsumerGroupV3Api.GetKafkaConsumerGroupLagSummary(c.context(), clusterId, consumerGroupId).Execute() +func (c *KafkaRestClient) GetKafkaConsumerGroupLagSummary(consumerGroupId string) (kafkarestv3.ConsumerGroupLagSummaryData, error) { + res, httpResp, err := c.ConsumerGroupV3Api.GetKafkaConsumerGroupLagSummary(c.context(), c.ClusterId, consumerGroupId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaConsumerGroups(clusterId string) (kafkarestv3.ConsumerGroupDataList, *http.Response, error) { - return c.ConsumerGroupV3Api.ListKafkaConsumerGroups(c.context(), clusterId).Execute() +func (c *KafkaRestClient) ListKafkaConsumerGroups() (kafkarestv3.ConsumerGroupDataList, error) { + res, httpResp, err := c.ConsumerGroupV3Api.ListKafkaConsumerGroups(c.context(), c.ClusterId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaConsumerLags(clusterId, consumerGroupId string) (kafkarestv3.ConsumerLagDataList, *http.Response, error) { - return c.ConsumerGroupV3Api.ListKafkaConsumerLags(c.context(), clusterId, consumerGroupId).Execute() +func (c *KafkaRestClient) ListKafkaConsumerLags(consumerGroupId string) (kafkarestv3.ConsumerLagDataList, error) { + res, httpResp, err := c.ConsumerGroupV3Api.ListKafkaConsumerLags(c.context(), c.ClusterId, consumerGroupId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaConsumers(clusterId, consumerGroupId string) (kafkarestv3.ConsumerDataList, *http.Response, error) { - return c.ConsumerGroupV3Api.ListKafkaConsumers(c.context(), clusterId, consumerGroupId).Execute() +func (c *KafkaRestClient) ListKafkaConsumers(consumerGroupId string) (kafkarestv3.ConsumerDataList, error) { + res, httpResp, err := c.ConsumerGroupV3Api.ListKafkaConsumers(c.context(), c.ClusterId, consumerGroupId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) GetKafkaConsumerLag(clusterId, consumerGroupId, topicName string, partitionId int32) (kafkarestv3.ConsumerLagData, *http.Response, error) { - return c.ConsumerGroupV3Api.GetKafkaConsumerLag(c.context(), clusterId, consumerGroupId, topicName, partitionId).Execute() +func (c *KafkaRestClient) GetKafkaConsumerLag(consumerGroupId, topicName string, partitionId int32) (kafkarestv3.ConsumerLagData, error) { + res, httpResp, err := c.ConsumerGroupV3Api.GetKafkaConsumerLag(c.context(), c.ClusterId, consumerGroupId, topicName, partitionId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) ListKafkaPartitions(clusterId, topicName string) (kafkarestv3.PartitionDataList, *http.Response, error) { - return c.PartitionV3Api.ListKafkaPartitions(c.context(), clusterId, topicName).Execute() +func (c *KafkaRestClient) ListKafkaPartitions(topicName string) (kafkarestv3.PartitionDataList, *http.Response, error) { + return c.PartitionV3Api.ListKafkaPartitions(c.context(), c.ClusterId, topicName).Execute() } -func (c *KafkaRestClient) CreateKafkaTopic(clusterId string, data kafkarestv3.CreateTopicRequestData) (kafkarestv3.TopicData, *http.Response, error) { - return c.TopicV3Api.CreateKafkaTopic(c.context(), clusterId).CreateTopicRequestData(data).Execute() +func (c *KafkaRestClient) CreateKafkaTopic(data kafkarestv3.CreateTopicRequestData) (kafkarestv3.TopicData, *http.Response, error) { + return c.TopicV3Api.CreateKafkaTopic(c.context(), c.ClusterId).CreateTopicRequestData(data).Execute() } -func (c *KafkaRestClient) DeleteKafkaTopic(clusterId, topicName string) (*http.Response, error) { - return c.TopicV3Api.DeleteKafkaTopic(c.context(), clusterId, topicName).Execute() +func (c *KafkaRestClient) DeleteKafkaTopic(topicName string) (*http.Response, error) { + return c.TopicV3Api.DeleteKafkaTopic(c.context(), c.ClusterId, topicName).Execute() } -func (c *KafkaRestClient) ListKafkaTopics(clusterId string) (kafkarestv3.TopicDataList, *http.Response, error) { - return c.TopicV3Api.ListKafkaTopics(c.context(), clusterId).Execute() +func (c *KafkaRestClient) ListKafkaTopics() (kafkarestv3.TopicDataList, error) { + res, httpResp, err := c.TopicV3Api.ListKafkaTopics(c.context(), c.ClusterId).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) UpdateKafkaTopicPartitionCount(clusterId, topicName string, updatePartitionCountRequestData kafkarestv3.UpdatePartitionCountRequestData) (kafkarestv3.TopicData, *http.Response, error) { - return c.TopicV3Api.UpdatePartitionCountKafkaTopic(c.context(), clusterId, topicName).UpdatePartitionCountRequestData(updatePartitionCountRequestData).Execute() +func (c *KafkaRestClient) UpdateKafkaTopicPartitionCount(topicName string, updatePartitionCountRequestData kafkarestv3.UpdatePartitionCountRequestData) (kafkarestv3.TopicData, error) { + res, httpResp, err := c.TopicV3Api.UpdatePartitionCountKafkaTopic(c.context(), c.ClusterId, topicName).UpdatePartitionCountRequestData(updatePartitionCountRequestData).Execute() + return res, kafkarest.NewError(c.GetUrl(), err, httpResp) } -func (c *KafkaRestClient) GetKafkaTopic(clusterId, topicName string) (kafkarestv3.TopicData, *http.Response, error) { - return c.TopicV3Api.GetKafkaTopic(c.context(), clusterId, topicName).Execute() +func (c *KafkaRestClient) GetKafkaTopic(topicName string) (kafkarestv3.TopicData, *http.Response, error) { + return c.TopicV3Api.GetKafkaTopic(c.context(), c.ClusterId, topicName).Execute() } diff --git a/internal/pkg/cmd/flags.go b/internal/pkg/cmd/flags.go index 5f10b01a3e..4fb517ff79 100644 --- a/internal/pkg/cmd/flags.go +++ b/internal/pkg/cmd/flags.go @@ -432,16 +432,11 @@ func AddLinkFlag(cmd *cobra.Command, command *AuthenticatedCLICommand) { func AutocompleteLinks(command *AuthenticatedCLICommand) []string { kafkaREST, err := command.GetKafkaREST() - if err != nil || kafkaREST == nil { - return nil - } - - kafkaClusterConfig, err := command.Context.GetKafkaClusterForCommand() if err != nil { return nil } - links, _, err := kafkaREST.CloudClient.ListKafkaLinks(kafkaClusterConfig.ID) + links, err := kafkaREST.CloudClient.ListKafkaLinks() if err != nil { return nil } diff --git a/internal/pkg/cmd/kafka_rest.go b/internal/pkg/cmd/kafka_rest.go index 737e1a9360..3523598713 100644 --- a/internal/pkg/cmd/kafka_rest.go +++ b/internal/pkg/cmd/kafka_rest.go @@ -14,10 +14,9 @@ type KafkaREST struct { Client *kafkarestv3.APIClient } -func NewKafkaREST(ctx context.Context, cloudClient *ccloudv2.KafkaRestClient, client *kafkarestv3.APIClient) *KafkaREST { - return &KafkaREST{ - Context: ctx, - CloudClient: cloudClient, - Client: client, +func (k *KafkaREST) GetClusterId() string { + if k == nil || k.CloudClient == nil { + return "" } + return k.CloudClient.ClusterId } diff --git a/internal/pkg/cmd/prerunner.go b/internal/pkg/cmd/prerunner.go index 3544e645e4..07bbe641a4 100644 --- a/internal/pkg/cmd/prerunner.go +++ b/internal/pkg/cmd/prerunner.go @@ -350,25 +350,24 @@ func (r *PreRun) setCCloudClient(c *AuthenticatedCLICommand) error { if cluster.Status.Phase == ccloudv2.StatusProvisioning { return nil, errors.Errorf(errors.KafkaRestProvisioningErrorMsg, lkc) } - if restEndpoint != "" { - state, err := ctx.AuthenticatedState() - if err != nil { - return nil, err - } - - dataplaneToken, err := pauth.GetDataplaneToken(state, ctx.Platform.Server) - if err != nil { - return nil, err - } - kafkaRest := &KafkaREST{ - Context: context.WithValue(context.Background(), kafkarestv3.ContextAccessToken, dataplaneToken), - CloudClient: ccloudv2.NewKafkaRestClient(restEndpoint, r.Version.UserAgent, unsafeTrace, dataplaneToken), - Client: CreateKafkaRESTClient(restEndpoint, unsafeTrace), - } + if restEndpoint == "" { + return nil, errors.New("Kafka REST is not enabled: the operation is only supported with Kafka REST proxy.") + } - return kafkaRest, nil + state, err := ctx.AuthenticatedState() + if err != nil { + return nil, err + } + dataplaneToken, err := pauth.GetDataplaneToken(state, ctx.Platform.Server) + if err != nil { + return nil, err + } + kafkaRest := &KafkaREST{ + Context: context.WithValue(context.Background(), kafkarestv3.ContextAccessToken, dataplaneToken), + CloudClient: ccloudv2.NewKafkaRestClient(restEndpoint, lkc, r.Version.UserAgent, dataplaneToken, unsafeTrace), + Client: CreateKafkaRESTClient(restEndpoint, unsafeTrace), } - return nil, nil + return kafkaRest, nil }) c.KafkaRESTProvider = &provider return nil diff --git a/internal/pkg/errors/strings.go b/internal/pkg/errors/strings.go index ff8a4ae110..de684ca61e 100644 --- a/internal/pkg/errors/strings.go +++ b/internal/pkg/errors/strings.go @@ -37,9 +37,6 @@ const ( CopyByokAwsPermissionsHeaderMsg = `Copy and append these permissions into the key policy "Statements" field of the ARN in your AWS key management system to authorize access for your Confluent Cloud cluster.` RunByokAzurePermissionsHeaderMsg = "To ensure the key vault has the correct role assignments, please run the following azure-cli command (certified for azure-cli v2.45):" - // kafka consumer-group commands - RestProxyNotAvailable = "Operation not supported: REST proxy is not available.\n" - // kafka topic commands StartingProducerMsg = "Starting Kafka Producer. Use Ctrl-C or Ctrl-D to exit." StoppingConsumerMsg = "Stopping Consumer." @@ -49,9 +46,6 @@ const ( ReadOnlyConfigNotUpdatedMsg = "(read-only configs were not updated)" OmitTopicCountMsg = "The topic count will be omitted as Kafka topics for this cluster could not be retrieved: %v" - // kafka mirror commands - RestProxyNotAvailableMsg = "Kafka REST is not enabled: the operation is only supported with Kafka REST proxy." - // kafka REST proxy MDSTokenNotFoundMsg = "No session token found, please enter user credentials. To avoid being prompted, run `confluent login`." diff --git a/internal/pkg/kafka/utils.go b/internal/pkg/kafka/utils.go index e2d73f16ad..2fb8440e8c 100644 --- a/internal/pkg/kafka/utils.go +++ b/internal/pkg/kafka/utils.go @@ -1,9 +1,5 @@ package kafka -type ListACLsContextKey string - -const Requester ListACLsContextKey = "requester" - var ( Clouds = []string{"aws", "azure", "gcp"} Availabilities = []string{"single-zone", "multi-zone"} diff --git a/internal/pkg/kafkarest/kafkarest.go b/internal/pkg/kafkarest/kafkarest.go index 2bfbcfab8d..881bc3cb38 100644 --- a/internal/pkg/kafkarest/kafkarest.go +++ b/internal/pkg/kafkarest/kafkarest.go @@ -64,6 +64,10 @@ func ParseOpenAPIErrorCloud(err error) (*V3Error, error) { if err := json.Unmarshal(openAPIError.Body(), &decodedError); err != nil { return nil, err } + if decodedError.Message == "" && decodedError.Code == 0 { + // Sometimes the SDK puts the error message in `error` instead of `body` + decodedError.Message = openAPIError.Error() + } return &decodedError, nil } return nil, fmt.Errorf("unexpected type") diff --git a/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-json.golden b/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-json.golden index 067fa5f699..5a5570e868 100644 --- a/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-json.golden +++ b/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-json.golden @@ -9,7 +9,7 @@ }, { "config_name": "dest.cluster.id", - "config_value": "cluster-1", + "config_value": "lkc-describe-topic", "read_only": true, "sensitive": true, "source": "", diff --git a/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-yaml.golden b/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-yaml.golden index b218df9b57..c6864ad030 100644 --- a/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-yaml.golden +++ b/test/fixtures/output/kafka/link/configuration-list-bidirectional-link-yaml.golden @@ -5,7 +5,7 @@ source: source-2 synonyms: [] - config_name: dest.cluster.id - config_value: cluster-1 + config_value: lkc-describe-topic read_only: true sensitive: true source: "" diff --git a/test/fixtures/output/kafka/link/configuration-list-json.golden b/test/fixtures/output/kafka/link/configuration-list-json.golden index fe619849a4..65d6aaee80 100644 --- a/test/fixtures/output/kafka/link/configuration-list-json.golden +++ b/test/fixtures/output/kafka/link/configuration-list-json.golden @@ -9,7 +9,7 @@ }, { "config_name": "dest.cluster.id", - "config_value": "cluster-1", + "config_value": "lkc-describe-topic", "read_only": true, "sensitive": true, "source": "", diff --git a/test/fixtures/output/kafka/link/configuration-list-plain-bidirectional-link.golden b/test/fixtures/output/kafka/link/configuration-list-plain-bidirectional-link.golden index 9b22bfba65..0b6f42c958 100644 --- a/test/fixtures/output/kafka/link/configuration-list-plain-bidirectional-link.golden +++ b/test/fixtures/output/kafka/link/configuration-list-plain-bidirectional-link.golden @@ -1,6 +1,6 @@ - Config Name | Config Value | Read-Only | Sensitive | Source | Synonyms ---------------------------+------------------+-----------+-----------+----------+-------------- - bootstrap.servers | bitcoin.com:8888 | false | false | source-2 | [] - dest.cluster.id | cluster-1 | true | true | | [] - link.mode | BIDIRECTIONAL | false | false | source-2 | [] - replica.fetch.max.bytes | 1048576 | false | false | source-1 | [rfmb bmfr] + Config Name | Config Value | Read-Only | Sensitive | Source | Synonyms +--------------------------+--------------------+-----------+-----------+----------+-------------- + bootstrap.servers | bitcoin.com:8888 | false | false | source-2 | [] + dest.cluster.id | lkc-describe-topic | true | true | | [] + link.mode | BIDIRECTIONAL | false | false | source-2 | [] + replica.fetch.max.bytes | 1048576 | false | false | source-1 | [rfmb bmfr] diff --git a/test/fixtures/output/kafka/link/configuration-list-plain.golden b/test/fixtures/output/kafka/link/configuration-list-plain.golden index ca53771c4d..962b4d49e9 100644 --- a/test/fixtures/output/kafka/link/configuration-list-plain.golden +++ b/test/fixtures/output/kafka/link/configuration-list-plain.golden @@ -1,6 +1,6 @@ - Config Name | Config Value | Read-Only | Sensitive | Source | Synonyms ---------------------------+------------------+-----------+-----------+----------+-------------- - bootstrap.servers | bitcoin.com:8888 | false | false | source-2 | [] - dest.cluster.id | cluster-1 | true | true | | [] - link.mode | DESTINATION | false | false | source-2 | [] - replica.fetch.max.bytes | 1048576 | false | false | source-1 | [rfmb bmfr] + Config Name | Config Value | Read-Only | Sensitive | Source | Synonyms +--------------------------+--------------------+-----------+-----------+----------+-------------- + bootstrap.servers | bitcoin.com:8888 | false | false | source-2 | [] + dest.cluster.id | lkc-describe-topic | true | true | | [] + link.mode | DESTINATION | false | false | source-2 | [] + replica.fetch.max.bytes | 1048576 | false | false | source-1 | [rfmb bmfr] diff --git a/test/fixtures/output/kafka/link/configuration-list-yaml.golden b/test/fixtures/output/kafka/link/configuration-list-yaml.golden index 129f67bc1b..679eec5f15 100644 --- a/test/fixtures/output/kafka/link/configuration-list-yaml.golden +++ b/test/fixtures/output/kafka/link/configuration-list-yaml.golden @@ -5,7 +5,7 @@ source: source-2 synonyms: [] - config_name: dest.cluster.id - config_value: cluster-1 + config_value: lkc-describe-topic read_only: true sensitive: true source: "" diff --git a/test/fixtures/output/kafka/mirror/describe-mirror-json.golden b/test/fixtures/output/kafka/mirror/describe-mirror-json.golden index 4b9ff60f85..8dbecb9d7d 100644 --- a/test/fixtures/output/kafka/mirror/describe-mirror-json.golden +++ b/test/fixtures/output/kafka/mirror/describe-mirror-json.golden @@ -3,7 +3,7 @@ "link_name": "link-1", "mirror_topic_name": "dest-topic-1", "source_topic_name": "src-topic-1", - "mirror_status": "active", + "mirror_status": "ACTIVE", "status_time_ms": 111111111, "partition": 0, "partition_mirror_lag": 142857, @@ -13,7 +13,7 @@ "link_name": "link-1", "mirror_topic_name": "dest-topic-1", "source_topic_name": "src-topic-1", - "mirror_status": "active", + "mirror_status": "ACTIVE", "status_time_ms": 111111111, "partition": 1, "partition_mirror_lag": 285714, @@ -23,7 +23,7 @@ "link_name": "link-1", "mirror_topic_name": "dest-topic-1", "source_topic_name": "src-topic-1", - "mirror_status": "active", + "mirror_status": "ACTIVE", "status_time_ms": 111111111, "partition": 2, "partition_mirror_lag": 571428, diff --git a/test/fixtures/output/kafka/mirror/describe-mirror-yaml.golden b/test/fixtures/output/kafka/mirror/describe-mirror-yaml.golden index 7650d157f6..7e73f9e908 100644 --- a/test/fixtures/output/kafka/mirror/describe-mirror-yaml.golden +++ b/test/fixtures/output/kafka/mirror/describe-mirror-yaml.golden @@ -1,7 +1,7 @@ - link_name: link-1 mirror_topic_name: dest-topic-1 source_topic_name: src-topic-1 - mirror_status: active + mirror_status: ACTIVE status_time_ms: 111111111 partition: 0 partition_mirror_lag: 142857 @@ -9,7 +9,7 @@ - link_name: link-1 mirror_topic_name: dest-topic-1 source_topic_name: src-topic-1 - mirror_status: active + mirror_status: ACTIVE status_time_ms: 111111111 partition: 1 partition_mirror_lag: 285714 @@ -17,7 +17,7 @@ - link_name: link-1 mirror_topic_name: dest-topic-1 source_topic_name: src-topic-1 - mirror_status: active + mirror_status: ACTIVE status_time_ms: 111111111 partition: 2 partition_mirror_lag: 571428 diff --git a/test/fixtures/output/kafka/mirror/describe-mirror.golden b/test/fixtures/output/kafka/mirror/describe-mirror.golden index e339cbdb27..748e1c551a 100644 --- a/test/fixtures/output/kafka/mirror/describe-mirror.golden +++ b/test/fixtures/output/kafka/mirror/describe-mirror.golden @@ -1,5 +1,5 @@ Link Name | Mirror Topic Name | Source Topic Name | Mirror Status | Status Time (ms) | Partition | Partition Mirror Lag | Last Source Fetch Offset ------------+-------------------+-------------------+---------------+------------------+-----------+----------------------+--------------------------- - link-1 | dest-topic-1 | src-topic-1 | active | 111111111 | 0 | 142857 | 1293009 - link-1 | dest-topic-1 | src-topic-1 | active | 111111111 | 1 | 285714 | 28340404 - link-1 | dest-topic-1 | src-topic-1 | active | 111111111 | 2 | 571428 | 5739304 + link-1 | dest-topic-1 | src-topic-1 | ACTIVE | 111111111 | 0 | 142857 | 1293009 + link-1 | dest-topic-1 | src-topic-1 | ACTIVE | 111111111 | 1 | 285714 | 28340404 + link-1 | dest-topic-1 | src-topic-1 | ACTIVE | 111111111 | 2 | 571428 | 5739304 diff --git a/test/fixtures/output/kafka/mirror/failover-mirror.golden b/test/fixtures/output/kafka/mirror/failover-mirror.golden new file mode 100644 index 0000000000..58a725f2e8 --- /dev/null +++ b/test/fixtures/output/kafka/mirror/failover-mirror.golden @@ -0,0 +1,6 @@ + Mirror Topic Name | Partition | Partition Mirror Lag | Last Source Fetch Offset | Error Message | Error Code +--------------------+-----------+----------------------+--------------------------+----------------+------------- + topic 2 | -1 | -1 | -1 | Not authorized | 401 + topic-1 | 0 | 142857 | 1000 | | + topic-1 | 1 | 285714 | 10000 | | + topic-1 | 2 | 571428 | 100000 | | diff --git a/test/fixtures/output/kafka/mirror/list-all-mirror-json.golden b/test/fixtures/output/kafka/mirror/list-all-mirror-json.golden index 170b773f91..6ea31c29aa 100644 --- a/test/fixtures/output/kafka/mirror/list-all-mirror-json.golden +++ b/test/fixtures/output/kafka/mirror/list-all-mirror-json.golden @@ -3,7 +3,7 @@ "link_name": "link-1", "mirror_topic_name": "dest-topic-1", "source_topic_name": "src-topic-1", - "mirror_status": "active", + "mirror_status": "ACTIVE", "status_time_ms": 111111111, "num_partition": 3, "max_per_partition_mirror_lag": 571428 @@ -12,7 +12,7 @@ "link_name": "link-2", "mirror_topic_name": "dest-topic-2", "source_topic_name": "src-topic-2", - "mirror_status": "stopped", + "mirror_status": "STOPPED", "status_time_ms": 222222222, "num_partition": 2, "max_per_partition_mirror_lag": 0 diff --git a/test/fixtures/output/kafka/mirror/list-all-mirror-yaml.golden b/test/fixtures/output/kafka/mirror/list-all-mirror-yaml.golden index b72088a900..eb6b05de34 100644 --- a/test/fixtures/output/kafka/mirror/list-all-mirror-yaml.golden +++ b/test/fixtures/output/kafka/mirror/list-all-mirror-yaml.golden @@ -1,14 +1,14 @@ - link_name: link-1 mirror_topic_name: dest-topic-1 source_topic_name: src-topic-1 - mirror_status: active + mirror_status: ACTIVE status_time_ms: 111111111 num_partition: 3 max_per_partition_mirror_lag: 571428 - link_name: link-2 mirror_topic_name: dest-topic-2 source_topic_name: src-topic-2 - mirror_status: stopped + mirror_status: STOPPED status_time_ms: 222222222 num_partition: 2 max_per_partition_mirror_lag: 0 diff --git a/test/fixtures/output/kafka/mirror/list-all-mirror.golden b/test/fixtures/output/kafka/mirror/list-all-mirror.golden index 70552f81ee..a1a551bdff 100644 --- a/test/fixtures/output/kafka/mirror/list-all-mirror.golden +++ b/test/fixtures/output/kafka/mirror/list-all-mirror.golden @@ -1,4 +1,4 @@ Link Name | Mirror Topic Name | Source Topic Name | Mirror Status | Status Time (ms) | Num Partition | Max Per Partition Mirror Lag ------------+-------------------+-------------------+---------------+------------------+---------------+------------------------------- - link-1 | dest-topic-1 | src-topic-1 | active | 111111111 | 3 | 571428 - link-2 | dest-topic-2 | src-topic-2 | stopped | 222222222 | 2 | 0 + link-1 | dest-topic-1 | src-topic-1 | ACTIVE | 111111111 | 3 | 571428 + link-2 | dest-topic-2 | src-topic-2 | STOPPED | 222222222 | 2 | 0 diff --git a/test/fixtures/output/kafka/mirror/list-help.golden b/test/fixtures/output/kafka/mirror/list-help.golden index c3ad6182af..4625dbfe5c 100644 --- a/test/fixtures/output/kafka/mirror/list-help.golden +++ b/test/fixtures/output/kafka/mirror/list-help.golden @@ -14,7 +14,7 @@ List all active mirror topics under "my-link": Flags: --link string Name of cluster link. - --mirror-status string Mirror topic status. Can be one of [active, failed, paused, stopped, pending_stopped]. If not specified, list all mirror topics. + --mirror-status string Mirror topic status. Can be one of "active", "failed", "paused", "stopped", or "pending_stopped". If not specified, list all mirror topics. --cluster string Kafka cluster ID. --context string CLI context name. --environment string Environment ID. diff --git a/test/fixtures/output/kafka/mirror/list-mirror-json.golden b/test/fixtures/output/kafka/mirror/list-mirror-json.golden index 170b773f91..6ea31c29aa 100644 --- a/test/fixtures/output/kafka/mirror/list-mirror-json.golden +++ b/test/fixtures/output/kafka/mirror/list-mirror-json.golden @@ -3,7 +3,7 @@ "link_name": "link-1", "mirror_topic_name": "dest-topic-1", "source_topic_name": "src-topic-1", - "mirror_status": "active", + "mirror_status": "ACTIVE", "status_time_ms": 111111111, "num_partition": 3, "max_per_partition_mirror_lag": 571428 @@ -12,7 +12,7 @@ "link_name": "link-2", "mirror_topic_name": "dest-topic-2", "source_topic_name": "src-topic-2", - "mirror_status": "stopped", + "mirror_status": "STOPPED", "status_time_ms": 222222222, "num_partition": 2, "max_per_partition_mirror_lag": 0 diff --git a/test/fixtures/output/kafka/mirror/list-mirror-yaml.golden b/test/fixtures/output/kafka/mirror/list-mirror-yaml.golden index b72088a900..eb6b05de34 100644 --- a/test/fixtures/output/kafka/mirror/list-mirror-yaml.golden +++ b/test/fixtures/output/kafka/mirror/list-mirror-yaml.golden @@ -1,14 +1,14 @@ - link_name: link-1 mirror_topic_name: dest-topic-1 source_topic_name: src-topic-1 - mirror_status: active + mirror_status: ACTIVE status_time_ms: 111111111 num_partition: 3 max_per_partition_mirror_lag: 571428 - link_name: link-2 mirror_topic_name: dest-topic-2 source_topic_name: src-topic-2 - mirror_status: stopped + mirror_status: STOPPED status_time_ms: 222222222 num_partition: 2 max_per_partition_mirror_lag: 0 diff --git a/test/fixtures/output/kafka/mirror/list-mirror.golden b/test/fixtures/output/kafka/mirror/list-mirror.golden index 70552f81ee..a1a551bdff 100644 --- a/test/fixtures/output/kafka/mirror/list-mirror.golden +++ b/test/fixtures/output/kafka/mirror/list-mirror.golden @@ -1,4 +1,4 @@ Link Name | Mirror Topic Name | Source Topic Name | Mirror Status | Status Time (ms) | Num Partition | Max Per Partition Mirror Lag ------------+-------------------+-------------------+---------------+------------------+---------------+------------------------------- - link-1 | dest-topic-1 | src-topic-1 | active | 111111111 | 3 | 571428 - link-2 | dest-topic-2 | src-topic-2 | stopped | 222222222 | 2 | 0 + link-1 | dest-topic-1 | src-topic-1 | ACTIVE | 111111111 | 3 | 571428 + link-2 | dest-topic-2 | src-topic-2 | STOPPED | 222222222 | 2 | 0 diff --git a/test/fixtures/output/kafka/mirror/pause-mirror.golden b/test/fixtures/output/kafka/mirror/pause-mirror.golden new file mode 100644 index 0000000000..65e146748e --- /dev/null +++ b/test/fixtures/output/kafka/mirror/pause-mirror.golden @@ -0,0 +1,5 @@ + Mirror Topic Name | Partition | Partition Mirror Lag | Last Source Fetch Offset | Error Message | Error Code +--------------------+-----------+----------------------+--------------------------+----------------+------------- + topic 2 | -1 | -1 | -1 | Not authorized | 401 + topic-1 | 0 | 142857 | 1293009 | | + topic-1 | 1 | 285714 | 28340404 | | diff --git a/test/fixtures/output/kafka/mirror/resume-mirror.golden b/test/fixtures/output/kafka/mirror/resume-mirror.golden new file mode 100644 index 0000000000..a5bd0191f2 --- /dev/null +++ b/test/fixtures/output/kafka/mirror/resume-mirror.golden @@ -0,0 +1,5 @@ + Mirror Topic Name | Partition | Partition Mirror Lag | Last Source Fetch Offset | Error Message | Error Code +--------------------+-----------+----------------------+--------------------------+----------------+------------- + topic 2 | -1 | -1 | -1 | Not authorized | 401 + topic-1 | 0 | 142857 | 1000 | | + topic-1 | 1 | 285714 | 10000 | | diff --git a/test/kafka_test.go b/test/kafka_test.go index dd3a965649..0534b29230 100644 --- a/test/kafka_test.go +++ b/test/kafka_test.go @@ -148,9 +148,12 @@ func (s *CLITestSuite) TestKafka() { {args: "kafka mirror describe topic-1 --link link-1 --cluster lkc-describe-topic", fixture: "kafka/mirror/describe-mirror.golden", useKafka: "lkc-describe-topic"}, {args: "kafka mirror describe topic-1 --link link-1 --cluster lkc-describe-topic -o json", fixture: "kafka/mirror/describe-mirror-json.golden", useKafka: "lkc-describe-topic"}, {args: "kafka mirror describe topic-1 --link link-1 --cluster lkc-describe-topic -o yaml", fixture: "kafka/mirror/describe-mirror-yaml.golden", useKafka: "lkc-describe-topic"}, + {args: "kafka mirror failover topic1 topic2 --cluster lkc-describe-topic --link link-1", fixture: "kafka/mirror/failover-mirror.golden", useKafka: "lkc-describe-topic"}, + {args: "kafka mirror pause topic1 topic2 --cluster lkc-describe-topic --link link-1", fixture: "kafka/mirror/pause-mirror.golden", useKafka: "lkc-describe-topic"}, {args: "kafka mirror promote topic1 topic2 --cluster lkc-describe-topic --link link-1", fixture: "kafka/mirror/promote-mirror.golden", useKafka: "lkc-describe-topic"}, {args: "kafka mirror promote topic1 topic2 --cluster lkc-describe-topic --link link-1 -o json", fixture: "kafka/mirror/promote-mirror-json.golden", useKafka: "lkc-describe-topic"}, {args: "kafka mirror promote topic1 topic2 --cluster lkc-describe-topic --link link-1 -o yaml", fixture: "kafka/mirror/promote-mirror-yaml.golden", useKafka: "lkc-describe-topic"}, + {args: "kafka mirror resume topic1 topic2 --cluster lkc-describe-topic --link link-1", fixture: "kafka/mirror/resume-mirror.golden", useKafka: "lkc-describe-topic"}, } if runtime.GOOS != "windows" { diff --git a/test/test-server/kafka_rest_router.go b/test/test-server/kafka_rest_router.go index 48fa575b8a..f152d3478a 100644 --- a/test/test-server/kafka_rest_router.go +++ b/test/test-server/kafka_rest_router.go @@ -22,8 +22,8 @@ type route struct { } var kafkaRestRoutes = []route{ - {"/kafka/v3/clusters", handleKafkaRPClusters}, - {"/kafka/v3/clusters/{cluster_id}/acls:batch", handleKafkaRPACLsBatch}, + {"/kafka/v3/clusters", handleKafkaRestClusters}, + {"/kafka/v3/clusters/{cluster_id}/acls:batch", handleKafkaRestACLsBatch}, {"/kafka/v3/clusters/{cluster_id}/broker-configs", handleKafkaBrokerConfigs}, {"/kafka/v3/clusters/{cluster_id}/broker-configs/{name}", handleKafkaBrokerConfigsName}, {"/kafka/v3/clusters/{cluster_id}/broker-configs:alter", handleKafkaBrokerConfigsAlter}, @@ -36,31 +36,34 @@ var kafkaRestRoutes = []route{ {"/kafka/v3/clusters/{cluster_id}/brokers/{broker_id}/configs:alter", handleKafkaBrokerIdConfigsAlter}, {"/kafka/v3/clusters/{cluster_id}/brokers/{broker_id}/tasks", handleKafkaClustersClusterIdBrokersBrokerIdTasksGet}, {"/kafka/v3/clusters/{cluster_id}/brokers/{broker_id}/tasks/{task_type}", handleKafkaClustersClusterIdBrokersBrokerIdTasksTaskTypeGet}, - {"/kafka/v3/clusters/{cluster_id}/consumer-groups", handleKafkaRPConsumerGroups}, - {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}", handleKafkaRPConsumerGroup}, - {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/consumers", handleKafkaRPConsumers}, - {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lag-summary", handleKafkaRPLagSummary}, - {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags", handleKafkaRPLags}, - {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags/{topic_name}/partitions/{partition_id}", handleKafkaRPLag}, + {"/kafka/v3/clusters/{cluster_id}/consumer-groups", handleKafkaRestConsumerGroups}, + {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}", handleKafkaRestConsumerGroup}, + {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/consumers", handleKafkaRestConsumers}, + {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lag-summary", handleKafkaRestLagSummary}, + {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags", handleKafkaRestLags}, + {"/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags/{topic_name}/partitions/{partition_id}", handleKafkaRestLag}, {"/kafka/v3/clusters/{cluster_id}/topics/{topic_name}/partitions", handleKafkaTopicPartitions}, {"/kafka/v3/clusters/{cluster_id}/topics/{topic_name}/partitions/{partition_id}", handleKafkaTopicPartitionId}, {"/kafka/v3/clusters/{cluster_id}/topics/{topic_name}/partitions/{partition_id}/reassignment", handleKafkaTopicPartitionIdReassignment}, - {"/kafka/v3/clusters/{cluster_id}/topics/{topic}/partitions/-/replica-status", handleKafkaRPReplicaStatus}, - {"/kafka/v3/clusters/{cluster}/acls", handleKafkaRPACLs}, - {"/kafka/v3/clusters/{cluster}/links", handleKafkaRPLinks}, - {"/kafka/v3/clusters/{cluster}/links/-/mirrors", handleKafkaRPAllMirrors}, - {"/kafka/v3/clusters/{cluster}/links/{link}", handleKafkaRPLink}, - {"/kafka/v3/clusters/{cluster}/links/{link}/configs", handleKafkaRPLinkConfigs}, - {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors", handleKafkaRPMirrors}, - {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors/{mirror_topic_name}", handleKafkaRPMirror}, - {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors:promote", handleKafkaRPMirrorsPromote}, + {"/kafka/v3/clusters/{cluster_id}/topics/{topic}/partitions/-/replica-status", handleKafkaRestReplicaStatus}, + {"/kafka/v3/clusters/{cluster}/acls", handleKafkaRestACLs}, + {"/kafka/v3/clusters/{cluster}/links", handleKafkaRestLinks}, + {"/kafka/v3/clusters/{cluster}/links/-/mirrors", handleKafkaRestAllMirrors}, + {"/kafka/v3/clusters/{cluster}/links/{link}", handleKafkaRestLink}, + {"/kafka/v3/clusters/{cluster}/links/{link}/configs", handleKafkaRestLinkConfigs}, + {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors", handleKafkaRestMirrors}, + {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors/{mirror_topic_name}", handleKafkaRestMirror}, + {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors:failover", handleKafkaRestMirrorsFailover}, + {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors:pause", handleKafkaRestMirrorsPause}, + {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors:promote", handleKafkaRestMirrorsPromote}, + {"/kafka/v3/clusters/{cluster}/links/{link}/mirrors:resume", handleKafkaRestMirrorsResume}, {"/kafka/v3/clusters/{cluster}/topic/{topic}/partitions/-/replica-status", handleClustersClusterIdTopicsTopicsNamePartitionsReplicaStatus}, - {"/kafka/v3/clusters/{cluster}/topics", handleKafkaRPTopics}, - {"/kafka/v3/clusters/{cluster}/topics/{topic}", handleKafkaRPTopic}, - {"/kafka/v3/clusters/{cluster}/topics/{topic}/configs", handleKafkaRPTopicConfigs}, - {"/kafka/v3/clusters/{cluster}/topics/{topic}/configs:alter", handleKafkaRPConfigsAlter}, + {"/kafka/v3/clusters/{cluster}/topics", handleKafkaRestTopics}, + {"/kafka/v3/clusters/{cluster}/topics/{topic}", handleKafkaRestTopic}, + {"/kafka/v3/clusters/{cluster}/topics/{topic}/configs", handleKafkaRestTopicConfigs}, + {"/kafka/v3/clusters/{cluster}/topics/{topic}/configs:alter", handleKafkaRestConfigsAlter}, {"/kafka/v3/clusters/{cluster}/topics/{topic}/partitions/{partition}/replica-status", handleClustersClusterIdTopicsTopicNamePartitionsPartitionIdReplicaStatus}, - {"/kafka/v3/clusters/{cluster}/topics/{topic}/partitions/{partition}/replicas", handleKafkaRPPartitionReplicas}, + {"/kafka/v3/clusters/{cluster}/topics/{topic}/partitions/{partition}/replicas", handleKafkaRestPartitionReplicas}, } func NewKafkaRestProxyRouter(t *testing.T) *mux.Router { @@ -75,7 +78,7 @@ func NewKafkaRestProxyRouter(t *testing.T) *mux.Router { } // Handler for: "/kafka/v3/clusters" -func handleKafkaRPClusters(t *testing.T) http.HandlerFunc { +func handleKafkaRestClusters(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // List Clusters if r.Method == http.MethodGet { @@ -104,7 +107,7 @@ func handleKafkaRPClusters(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster}/acls" -func handleKafkaRPACLs(t *testing.T) http.HandlerFunc { +func handleKafkaRestACLs(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { data := []cckafkarestv3.AclData{{ ResourceType: cckafkarestv3.TOPIC, @@ -134,14 +137,14 @@ func handleKafkaRPACLs(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster}/acls:batch" -func handleKafkaRPACLsBatch(_ *testing.T) http.HandlerFunc { +func handleKafkaRestACLsBatch(_ *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } } // Handler for: "/kafka/v3/clusters/{cluster}/topics" -func handleKafkaRPTopics(t *testing.T) http.HandlerFunc { +func handleKafkaRestTopics(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: @@ -238,7 +241,7 @@ func handleKafkaRPTopics(t *testing.T) http.HandlerFunc { func PtrString(v string) *string { return &v } // Handler for: "/kafka/v3/clusters/{cluster}/topics/{topic}/configs" -func handleKafkaRPTopicConfigs(t *testing.T) http.HandlerFunc { +func handleKafkaRestTopicConfigs(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) topicName := vars["topic"] @@ -310,7 +313,7 @@ func handleKafkaRPTopicConfigs(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/topics/{topic}/partitions/-/replica-status" -func handleKafkaRPReplicaStatus(t *testing.T) http.HandlerFunc { +func handleKafkaRestReplicaStatus(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) topic := vars["topic"] @@ -387,7 +390,7 @@ func handleKafkaRPReplicaStatus(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster}/topics/{topic}/partitions/{partition}/replicas" -func handleKafkaRPPartitionReplicas(t *testing.T) http.HandlerFunc { +func handleKafkaRestPartitionReplicas(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) topicName := vars["topic"] @@ -474,7 +477,7 @@ func handleKafkaRPPartitionReplicas(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster}/topics/{topic}/configs:alter" -func handleKafkaRPConfigsAlter(t *testing.T) http.HandlerFunc { +func handleKafkaRestConfigsAlter(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) topicName := vars["topic"] @@ -517,7 +520,7 @@ func handleKafkaRPConfigsAlter(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster}/topics/{topic}" -func handleKafkaRPTopic(t *testing.T) http.HandlerFunc { +func handleKafkaRestTopic(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { topic := mux.Vars(r)["topic"] if topic != "topic-exist" && topic != "topic-exist-2" && topic != "topic-exist-rest" { @@ -540,7 +543,7 @@ func handleKafkaRPTopic(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/links" -func handleKafkaRPLinks(t *testing.T) http.HandlerFunc { +func handleKafkaRestLinks(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: @@ -600,7 +603,7 @@ func handleKafkaRPLinks(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/consumer-groups" -func handleKafkaRPConsumerGroups(t *testing.T) http.HandlerFunc { +func handleKafkaRestConsumerGroups(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: @@ -640,7 +643,7 @@ func handleKafkaRPConsumerGroups(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster}/links/{link}" -func handleKafkaRPLink(t *testing.T) http.HandlerFunc { +func handleKafkaRestLink(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { cluster := mux.Vars(r)["cluster"] link := mux.Vars(r)["link"] @@ -706,7 +709,7 @@ func handleKafkaRPLink(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}" -func handleKafkaRPConsumerGroup(t *testing.T) http.HandlerFunc { +func handleKafkaRestConsumerGroup(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) switch r.Method { @@ -734,7 +737,7 @@ func handleKafkaRPConsumerGroup(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/links/-/mirrors" -func handleKafkaRPAllMirrors(t *testing.T) http.HandlerFunc { +func handleKafkaRestAllMirrors(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: @@ -743,54 +746,54 @@ func handleKafkaRPAllMirrors(t *testing.T) http.HandlerFunc { err := json.NewDecoder(r.Body).Decode(&req) require.NoError(t, err) case http.MethodGet: - err := json.NewEncoder(w).Encode(cpkafkarestv3.ListMirrorTopicsResponseDataList{Data: []cpkafkarestv3.ListMirrorTopicsResponseData{ + err := json.NewEncoder(w).Encode(cckafkarestv3.ListMirrorTopicsResponseDataList{Data: []cckafkarestv3.ListMirrorTopicsResponseData{ { - Kind: "", - Metadata: cpkafkarestv3.ResourceMetadata{}, LinkName: "link-1", MirrorTopicName: "dest-topic-1", SourceTopicName: "src-topic-1", NumPartitions: 3, - MirrorLags: []cpkafkarestv3.MirrorLag{ - { - Partition: 0, - Lag: 142857, - LastSourceFetchOffset: 1293009, - }, - { - Partition: 1, - Lag: 285714, - LastSourceFetchOffset: 28340404, - }, - { - Partition: 2, - Lag: 571428, - LastSourceFetchOffset: 5739304, + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, }, }, - MirrorStatus: "active", + MirrorStatus: cckafkarestv3.ACTIVE, StateTimeMs: 111111111, }, { - Kind: "", - Metadata: cpkafkarestv3.ResourceMetadata{}, LinkName: "link-2", MirrorTopicName: "dest-topic-2", SourceTopicName: "src-topic-2", NumPartitions: 2, - MirrorLags: []cpkafkarestv3.MirrorLag{ - { - Partition: 0, - Lag: 0, - LastSourceFetchOffset: 0, - }, - { - Partition: 1, - Lag: 0, - LastSourceFetchOffset: 0, + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 0, + LastSourceFetchOffset: 0, + }, + { + Partition: 1, + Lag: 0, + LastSourceFetchOffset: 0, + }, }, }, - MirrorStatus: "stopped", + MirrorStatus: cckafkarestv3.STOPPED, StateTimeMs: 222222222, }, }}) @@ -800,7 +803,7 @@ func handleKafkaRPAllMirrors(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/consumers" -func handleKafkaRPConsumers(t *testing.T) http.HandlerFunc { +func handleKafkaRestConsumers(t *testing.T) http.HandlerFunc { instance1 := "instance-1" instance2 := "instance-2" return func(w http.ResponseWriter, r *http.Request) { @@ -838,7 +841,7 @@ func handleKafkaRPConsumers(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors" -func handleKafkaRPMirrors(t *testing.T) http.HandlerFunc { +func handleKafkaRestMirrors(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: @@ -847,54 +850,54 @@ func handleKafkaRPMirrors(t *testing.T) http.HandlerFunc { err := json.NewDecoder(r.Body).Decode(&req) require.NoError(t, err) case http.MethodGet: - err := json.NewEncoder(w).Encode(cpkafkarestv3.ListMirrorTopicsResponseDataList{Data: []cpkafkarestv3.ListMirrorTopicsResponseData{ + err := json.NewEncoder(w).Encode(cckafkarestv3.ListMirrorTopicsResponseDataList{Data: []cckafkarestv3.ListMirrorTopicsResponseData{ { - Kind: "", - Metadata: cpkafkarestv3.ResourceMetadata{}, LinkName: "link-1", MirrorTopicName: "dest-topic-1", SourceTopicName: "src-topic-1", NumPartitions: 3, - MirrorLags: []cpkafkarestv3.MirrorLag{ - { - Partition: 0, - Lag: 142857, - LastSourceFetchOffset: 1293009, - }, - { - Partition: 1, - Lag: 285714, - LastSourceFetchOffset: 28340404, - }, - { - Partition: 2, - Lag: 571428, - LastSourceFetchOffset: 5739304, + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, }, }, - MirrorStatus: "active", + MirrorStatus: cckafkarestv3.ACTIVE, StateTimeMs: 111111111, }, { - Kind: "", - Metadata: cpkafkarestv3.ResourceMetadata{}, LinkName: "link-2", MirrorTopicName: "dest-topic-2", SourceTopicName: "src-topic-2", NumPartitions: 2, - MirrorLags: []cpkafkarestv3.MirrorLag{ - { - Partition: 0, - Lag: 0, - LastSourceFetchOffset: 0, - }, - { - Partition: 1, - Lag: 0, - LastSourceFetchOffset: 0, + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 0, + LastSourceFetchOffset: 0, + }, + { + Partition: 1, + Lag: 0, + LastSourceFetchOffset: 0, + }, }, }, - MirrorStatus: "stopped", + MirrorStatus: cckafkarestv3.STOPPED, StateTimeMs: 222222222, }, }}) @@ -904,7 +907,7 @@ func handleKafkaRPMirrors(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lag-summary" -func handleKafkaRPLagSummary(t *testing.T) http.HandlerFunc { +func handleKafkaRestLagSummary(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) switch r.Method { @@ -935,59 +938,219 @@ func handleKafkaRPLagSummary(t *testing.T) http.HandlerFunc { } } -// Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors:promote" -func handleKafkaRPMirrorsPromote(t *testing.T) http.HandlerFunc { +// Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors:failover" +func handleKafkaRestMirrorsFailover(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodPost: - errorMsg := "Not authorized" - var errorCode int32 = 401 - err := json.NewEncoder(w).Encode(cpkafkarestv3.AlterMirrorStatusResponseDataList{Data: []cpkafkarestv3.AlterMirrorStatusResponseData{ + err := json.NewEncoder(w).Encode(cckafkarestv3.AlterMirrorStatusResponseDataList{Data: []cckafkarestv3.AlterMirrorStatusResponseData{ { - Kind: "", - Metadata: cpkafkarestv3.ResourceMetadata{}, - MirrorTopicName: "dest-topic-1", - ErrorMessage: nil, - ErrorCode: nil, - MirrorLags: []cpkafkarestv3.MirrorLag{ - { - Partition: 0, - Lag: 142857, - LastSourceFetchOffset: 1293009, + MirrorTopicName: "topic-1", + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1000, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 10000, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 100000, + }, }, - { - Partition: 1, - Lag: 285714, - LastSourceFetchOffset: 28340404, + }, + }, + { + MirrorTopicName: "topic 2", + ErrorMessage: *cckafkarestv3.NewNullableString(cckafkarestv3.PtrString("Not authorized")), + ErrorCode: *cckafkarestv3.NewNullableInt32(cckafkarestv3.PtrInt32(401)), + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, }, - { - Partition: 2, - Lag: 571428, - LastSourceFetchOffset: 5739304, + }, + }, + }}) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors:pause" +func handleKafkaRestMirrorsPause(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + err := json.NewEncoder(w).Encode(cckafkarestv3.AlterMirrorStatusResponseDataList{Data: []cckafkarestv3.AlterMirrorStatusResponseData{ + { + MirrorTopicName: "topic-1", + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, }, }, }, { - Kind: "", - Metadata: cpkafkarestv3.ResourceMetadata{}, + MirrorTopicName: "topic 2", + ErrorMessage: *cckafkarestv3.NewNullableString(cckafkarestv3.PtrString("Not authorized")), + ErrorCode: *cckafkarestv3.NewNullableInt32(cckafkarestv3.PtrInt32(401)), + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, + }, + }, + }, + }}) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors:promote" +func handleKafkaRestMirrorsPromote(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + err := json.NewEncoder(w).Encode(cckafkarestv3.AlterMirrorStatusResponseDataList{Data: []cckafkarestv3.AlterMirrorStatusResponseData{ + { MirrorTopicName: "dest-topic-1", - ErrorMessage: &errorMsg, - ErrorCode: &errorCode, - MirrorLags: []cpkafkarestv3.MirrorLag{ - { - Partition: 0, - Lag: 142857, - LastSourceFetchOffset: 1293009, + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, }, - { - Partition: 1, - Lag: 285714, - LastSourceFetchOffset: 28340404, + }, + }, + { + MirrorTopicName: "dest-topic-1", + ErrorMessage: *cckafkarestv3.NewNullableString(cckafkarestv3.PtrString("Not authorized")), + ErrorCode: *cckafkarestv3.NewNullableInt32(cckafkarestv3.PtrInt32(401)), + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, }, - { - Partition: 2, - Lag: 571428, - LastSourceFetchOffset: 5739304, + }, + }, + }}) + require.NoError(t, err) + } + } +} + +// Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors:resume" +func handleKafkaRestMirrorsResume(t *testing.T) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodPost: + err := json.NewEncoder(w).Encode(cckafkarestv3.AlterMirrorStatusResponseDataList{Data: []cckafkarestv3.AlterMirrorStatusResponseData{ + { + MirrorTopicName: "topic-1", + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1000, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 10000, + }, + }, + }, + }, + { + MirrorTopicName: "topic 2", + ErrorMessage: *cckafkarestv3.NewNullableString(cckafkarestv3.PtrString("Not authorized")), + ErrorCode: *cckafkarestv3.NewNullableInt32(cckafkarestv3.PtrInt32(401)), + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, }, }, }, @@ -998,7 +1161,7 @@ func handleKafkaRPMirrorsPromote(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags" -func handleKafkaRPLags(t *testing.T) http.HandlerFunc { +func handleKafkaRestLags(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) switch r.Method { @@ -1050,7 +1213,7 @@ func handleKafkaRPLags(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/configs" -func handleKafkaRPLinkConfigs(t *testing.T) http.HandlerFunc { +func handleKafkaRestLinkConfigs(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { link := mux.Vars(r)["link"] if link == "link-dne" { @@ -1070,7 +1233,7 @@ func handleKafkaRPLinkConfigs(t *testing.T) http.HandlerFunc { { Kind: "", Metadata: cpkafkarestv3.ResourceMetadata{}, - ClusterId: "cluster-1", + ClusterId: "lkc-describe-topic", Name: "replica.fetch.max.bytes", Value: "1048576", ReadOnly: false, @@ -1082,7 +1245,7 @@ func handleKafkaRPLinkConfigs(t *testing.T) http.HandlerFunc { { Kind: "", Metadata: cpkafkarestv3.ResourceMetadata{}, - ClusterId: "cluster-1", + ClusterId: "lkc-describe-topic", Name: "bootstrap.servers", Value: "bitcoin.com:8888", ReadOnly: false, @@ -1094,7 +1257,7 @@ func handleKafkaRPLinkConfigs(t *testing.T) http.HandlerFunc { { Kind: "", Metadata: cpkafkarestv3.ResourceMetadata{}, - ClusterId: "cluster-1", + ClusterId: "lkc-describe-topic", Name: "link.mode", Value: linkMode, ReadOnly: false, @@ -1110,35 +1273,37 @@ func handleKafkaRPLinkConfigs(t *testing.T) http.HandlerFunc { } // Handler for: "/kafka/v3/clusters/{cluster_id}/links/{link_name}/mirrors/{mirror_name}" -func handleKafkaRPMirror(t *testing.T) http.HandlerFunc { +func handleKafkaRestMirror(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: - err := json.NewEncoder(w).Encode(cpkafkarestv3.ListMirrorTopicsResponseData{ + err := json.NewEncoder(w).Encode(cckafkarestv3.ListMirrorTopicsResponseData{ Kind: "", - Metadata: cpkafkarestv3.ResourceMetadata{}, + Metadata: cckafkarestv3.ResourceMetadata{}, LinkName: "link-1", MirrorTopicName: "dest-topic-1", SourceTopicName: "src-topic-1", NumPartitions: 3, - MirrorLags: []cpkafkarestv3.MirrorLag{ - { - Partition: 0, - Lag: 142857, - LastSourceFetchOffset: 1293009, - }, - { - Partition: 1, - Lag: 285714, - LastSourceFetchOffset: 28340404, - }, - { - Partition: 2, - Lag: 571428, - LastSourceFetchOffset: 5739304, + MirrorLags: cckafkarestv3.MirrorLags{ + Items: []cckafkarestv3.MirrorLag{ + { + Partition: 0, + Lag: 142857, + LastSourceFetchOffset: 1293009, + }, + { + Partition: 1, + Lag: 285714, + LastSourceFetchOffset: 28340404, + }, + { + Partition: 2, + Lag: 571428, + LastSourceFetchOffset: 5739304, + }, }, }, - MirrorStatus: "active", + MirrorStatus: cckafkarestv3.ACTIVE, StateTimeMs: 111111111, }) require.NoError(t, err) @@ -1152,7 +1317,7 @@ type partitionOffsets struct { } // Handler for: "/kafka/v3/clusters/{cluster_id}/consumer-groups/{consumer_group_id}/lags/{topic_name}/partitions/{partition_id}" -func handleKafkaRPLag(t *testing.T) http.HandlerFunc { +func handleKafkaRestLag(t *testing.T) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r)