Skip to content

Commit

Permalink
Cloud Kafka REST cleanup (#2144)
Browse files Browse the repository at this point in the history
  • Loading branch information
sgagniere authored Aug 3, 2023
1 parent 2ac4239 commit 518e9c2
Show file tree
Hide file tree
Showing 65 changed files with 742 additions and 747 deletions.
13 changes: 6 additions & 7 deletions internal/cmd/asyncapi/command_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
v1 "github.com/confluentinc/cli/internal/pkg/config/v1"
"github.com/confluentinc/cli/internal/pkg/errors"
"github.com/confluentinc/cli/internal/pkg/examples"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/log"
"github.com/confluentinc/cli/internal/pkg/output"
schemaregistry "github.com/confluentinc/cli/internal/pkg/schema-registry"
Expand Down Expand Up @@ -191,7 +190,7 @@ func (c *command) getChannelDetails(details *accountDetails, flags *flags) error
}
details.channelDetails.example = example
}
bindings, err := c.getBindings(details.kafkaClusterId, details.channelDetails.currentTopic.GetTopicName())
bindings, err := c.getBindings(details.channelDetails.currentTopic.GetTopicName())
if err != nil {
log.CliLogger.Warnf("Bindings not found: %v", err)
}
Expand Down Expand Up @@ -302,17 +301,17 @@ func (c command) getMessageExamples(consumer *ckgo.Consumer, topicName, contentT
return jsonMessage, nil
}

func (c *command) getBindings(clusterId, topicName string) (*bindings, error) {
func (c *command) getBindings(topicName string) (*bindings, error) {
kafkaREST, err := c.GetKafkaREST()
if err != nil {
return nil, err
}
configs, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(clusterId, topicName)
configs, err := kafkaREST.CloudClient.ListKafkaTopicConfigs(topicName)
if err != nil {
return nil, err
}
var numPartitions int32
partitionsResp, _, err := kafkaREST.CloudClient.ListKafkaPartitions(clusterId, topicName)
partitionsResp, _, err := kafkaREST.CloudClient.ListKafkaPartitions(topicName)
if err != nil {
return nil, fmt.Errorf("unable to get topic partitions: %v", err)
}
Expand Down Expand Up @@ -401,9 +400,9 @@ func (c *command) getClusterDetails(details *accountDetails, flags *flags) error
return err
}

topics, httpResp, err := kafkaREST.CloudClient.ListKafkaTopics(clusterConfig.ID)
topics, err := kafkaREST.CloudClient.ListKafkaTopics()
if err != nil {
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return err
}

environment, err := c.Context.EnvironmentId()
Expand Down
15 changes: 7 additions & 8 deletions internal/cmd/asyncapi/command_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,16 +258,16 @@ func (c *command) addTopic(details *accountDetails, topicName string, kafkaBindi
return true, false, nil
}
// Overwrite existing topic
err := c.updateTopic(details, topicName, kafkaBinding)
err := c.updateTopic(topicName, kafkaBinding)
return true, false, err
}
}
// Create a new topic
newTopicCreated, err := c.createTopic(details, topicName, kafkaBinding)
newTopicCreated, err := c.createTopic(topicName, kafkaBinding)
return false, newTopicCreated, err
}

func (c *command) createTopic(details *accountDetails, topicName string, kafkaBinding kafkaBinding) (bool, error) {
func (c *command) createTopic(topicName string, kafkaBinding kafkaBinding) (bool, error) {
log.CliLogger.Infof("Topic not found. Adding a new topic: %s", topicName)
topicConfigs := []kafkarestv3.CreateTopicRequestDataConfigs{}
for configName, configValue := range combineTopicConfigs(kafkaBinding) {
Expand All @@ -288,8 +288,7 @@ func (c *command) createTopic(details *accountDetails, topicName string, kafkaBi
if err != nil {
return false, err
}
if _, httpResp, err := kafkaRest.CloudClient.CreateKafkaTopic(details.kafkaClusterId,
createTopicRequestData); err != nil {
if _, httpResp, err := kafkaRest.CloudClient.CreateKafkaTopic(createTopicRequestData); err != nil {
restErr, parseErr := kafkarest.ParseOpenAPIErrorCloud(err)
if parseErr == nil && restErr.Code == ccloudv2.BadRequestErrorCode {
// Print partition limit error w/ suggestion
Expand All @@ -303,15 +302,15 @@ func (c *command) createTopic(details *accountDetails, topicName string, kafkaBi
return true, nil
}

func (c *command) updateTopic(details *accountDetails, topicName string, kafkaBinding kafkaBinding) error {
func (c *command) updateTopic(topicName string, kafkaBinding kafkaBinding) error {
// Overwrite topic configs
updateConfigs := []kafkarestv3.AlterConfigBatchRequestDataData{}
modifiableConfigs := []string{}
kafkaRest, err := c.GetKafkaREST()
if err != nil {
return err
}
configs, err := kafkaRest.CloudClient.ListKafkaTopicConfigs(details.kafkaClusterId, topicName)
configs, err := kafkaRest.CloudClient.ListKafkaTopicConfigs(topicName)
if err != nil {
return err
}
Expand All @@ -331,7 +330,7 @@ func (c *command) updateTopic(details *accountDetails, topicName string, kafkaBi
}
log.CliLogger.Info("Overwriting topic configs")
if updateConfigs != nil {
_, err = kafkaRest.CloudClient.UpdateKafkaTopicConfigBatch(details.kafkaClusterId, topicName, kafkarestv3.AlterConfigBatchRequestData{Data: updateConfigs})
_, err = kafkaRest.CloudClient.UpdateKafkaTopicConfigBatch(topicName, kafkarestv3.AlterConfigBatchRequestData{Data: updateConfigs})
if err != nil {
return fmt.Errorf("unable to update topic configs: %v", err)
}
Expand Down
14 changes: 4 additions & 10 deletions internal/cmd/kafka/command_acl_create.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/confluentinc/cli/internal/pkg/ccstructs"
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/examples"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
)

func (c *aclCommand) newCreateCommand() *cobra.Command {
Expand Down Expand Up @@ -66,27 +65,22 @@ func (c *aclCommand) create(cmd *cobra.Command, _ []string) error {
bindings[i] = acl.ACLBinding
}

kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand()
kafkaREST, err := c.GetKafkaREST()
if err != nil {
return err
}

if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil {
return err
}

kafkaREST, err := c.GetKafkaREST()
if err != nil {
if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil {
return err
}

for i, binding := range bindings {
data := pacl.GetCreateAclRequestData(binding)
if httpResp, err := kafkaREST.CloudClient.CreateKafkaAcls(kafkaClusterConfig.ID, data); err != nil {
if err := kafkaREST.CloudClient.CreateKafkaAcls(data); err != nil {
if i > 0 {
_ = pacl.PrintACLs(cmd, bindings[:i])
}
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return err
}
}

Expand Down
18 changes: 6 additions & 12 deletions internal/cmd/kafka/command_acl_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/errors"
"github.com/confluentinc/cli/internal/pkg/form"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/output"
)

Expand Down Expand Up @@ -60,25 +59,20 @@ func (c *aclCommand) delete(cmd *cobra.Command, _ []string) error {
filters[i] = convertToFilter(acl.ACLBinding)
}

kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand()
kafkaREST, err := c.GetKafkaREST()
if err != nil {
return err
}

if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil {
return err
}

kafkaREST, err := c.GetKafkaREST()
if err != nil {
if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil {
return err
}

count := 0
for _, acl := range acls {
aclDataList, httpResp, err := kafkaREST.CloudClient.GetKafkaAcls(kafkaClusterConfig.ID, acl.ACLBinding)
aclDataList, err := kafkaREST.CloudClient.GetKafkaAcls(acl.ACLBinding)
if err != nil {
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return err
}
if len(aclDataList.Data) == 0 {
return errors.NewErrorWithSuggestions("one or more ACLs matching these parameters not found", ValidACLSuggestion)
Expand All @@ -96,12 +90,12 @@ func (c *aclCommand) delete(cmd *cobra.Command, _ []string) error {

count = 0
for i, filter := range filters {
deleteResp, httpResp, err := kafkaREST.CloudClient.DeleteKafkaAcls(kafkaClusterConfig.ID, filter)
deleteResp, err := kafkaREST.CloudClient.DeleteKafkaAcls(filter)
if err != nil {
if i > 0 {
output.ErrPrintln(printAclsDeleted(count))
}
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return err
}

count += len(deleteResp.Data)
Expand Down
12 changes: 3 additions & 9 deletions internal/cmd/kafka/command_acl_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

aclutil "github.com/confluentinc/cli/internal/pkg/acl"
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
)

func (c *aclCommand) newListCommand() *cobra.Command {
Expand Down Expand Up @@ -43,24 +42,19 @@ func (c *aclCommand) list(cmd *cobra.Command, _ []string) error {
return acl[0].errors
}

kafkaClusterConfig, err := c.Context.GetKafkaClusterForCommand()
kafkaREST, err := c.GetKafkaREST()
if err != nil {
return err
}

if err := c.provisioningClusterCheck(kafkaClusterConfig.ID); err != nil {
if err := c.provisioningClusterCheck(kafkaREST.GetClusterId()); err != nil {
return err
}

kafkaREST, err := c.GetKafkaREST()
aclDataList, err := kafkaREST.CloudClient.GetKafkaAcls(acl[0].ACLBinding)
if err != nil {
return err
}

aclDataList, httpResp, err := kafkaREST.CloudClient.GetKafkaAcls(kafkaClusterConfig.ID, acl[0].ACLBinding)
if err != nil {
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
}

return aclutil.PrintACLsFromKafkaRestResponse(cmd, aclDataList.Data)
}
7 changes: 1 addition & 6 deletions internal/cmd/kafka/command_cluster_configuration_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,7 @@ func (c *clusterCommand) configurationDescribe(cmd *cobra.Command, args []string
return err
}

cluster, err := c.Context.GetKafkaClusterForCommand()
if err != nil {
return err
}

config, err := kafkaREST.CloudClient.GetKafkaClusterConfig(cluster.ID, args[0])
config, err := kafkaREST.CloudClient.GetKafkaClusterConfig(args[0])
if err != nil {
return catchConfigurationNotFound(err, args[0])
}
Expand Down
7 changes: 1 addition & 6 deletions internal/cmd/kafka/command_cluster_configuration_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,7 @@ func (c *clusterCommand) configurationList(cmd *cobra.Command, args []string) er
return err
}

cluster, err := c.Context.GetKafkaClusterForCommand()
if err != nil {
return err
}

configs, err := kafkaREST.CloudClient.ListKafkaClusterConfigs(cluster.ID)
configs, err := kafkaREST.CloudClient.ListKafkaClusterConfigs()
if err != nil {
return err
}
Expand Down
7 changes: 1 addition & 6 deletions internal/cmd/kafka/command_cluster_configuration_update.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,6 @@ func (c *clusterCommand) configurationUpdate(cmd *cobra.Command, _ []string) err
return err
}

cluster, err := c.Context.GetKafkaClusterForCommand()
if err != nil {
return err
}

config, err := cmd.Flags().GetStringSlice("config")
if err != nil {
return err
Expand All @@ -71,7 +66,7 @@ func (c *clusterCommand) configurationUpdate(cmd *cobra.Command, _ []string) err
}

req := kafkarestv3.AlterConfigBatchRequestData{Data: data}
if err := kafkaREST.CloudClient.UpdateKafkaClusterConfigs(cluster.ID, req); err != nil {
if err := kafkaREST.CloudClient.UpdateKafkaClusterConfigs(req); err != nil {
return err
}

Expand Down
5 changes: 2 additions & 3 deletions internal/cmd/kafka/command_cluster_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
v1 "github.com/confluentinc/cli/internal/pkg/config/v1"
"github.com/confluentinc/cli/internal/pkg/errors"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/log"
"github.com/confluentinc/cli/internal/pkg/output"
"github.com/confluentinc/cli/internal/pkg/resource"
Expand Down Expand Up @@ -180,9 +179,9 @@ func (c *clusterCommand) getTopicCountForKafkaCluster(cluster *cmkv2.CmkV2Cluste
return 0, err
}

topics, httpResp, err := kafkaREST.CloudClient.ListKafkaTopics(cluster.GetId())
topics, err := kafkaREST.CloudClient.ListKafkaTopics()
if err != nil {
return 0, kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return 0, err
}

return len(topics.Data), nil
Expand Down
7 changes: 3 additions & 4 deletions internal/cmd/kafka/command_consumergroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
kafkarestv3 "github.com/confluentinc/ccloud-sdk-go-v2/kafkarest/v3"

pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
)

type consumerGroupCommand struct {
Expand Down Expand Up @@ -83,14 +82,14 @@ func (c *consumerGroupCommand) autocompleteConsumerGroups() []string {
}

func listConsumerGroups(flagCmd *pcmd.AuthenticatedCLICommand) (*kafkarestv3.ConsumerGroupDataList, error) {
kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(flagCmd)
kafkaREST, err := flagCmd.GetKafkaREST()
if err != nil {
return nil, err
}

groupCmdResp, httpResp, err := kafkaREST.CloudClient.ListKafkaConsumerGroups(lkc)
groupCmdResp, err := kafkaREST.CloudClient.ListKafkaConsumerGroups()
if err != nil {
return nil, kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return nil, err
}

return &groupCmdResp, nil
Expand Down
11 changes: 5 additions & 6 deletions internal/cmd/kafka/command_consumergroup_describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/examples"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/output"
)

Expand Down Expand Up @@ -40,19 +39,19 @@ func (c *consumerGroupCommand) newDescribeCommand() *cobra.Command {
func (c *consumerGroupCommand) describe(cmd *cobra.Command, args []string) error {
consumerGroupId := args[0]

kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(c.AuthenticatedCLICommand)
kafkaREST, err := c.GetKafkaREST()
if err != nil {
return err
}

groupCmdResp, httpResp, err := kafkaREST.CloudClient.GetKafkaConsumerGroup(lkc, consumerGroupId)
groupCmdResp, err := kafkaREST.CloudClient.GetKafkaConsumerGroup(consumerGroupId)
if err != nil {
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return err
}

groupCmdConsumersResp, httpResp, err := kafkaREST.CloudClient.ListKafkaConsumers(lkc, consumerGroupId)
groupCmdConsumersResp, err := kafkaREST.CloudClient.ListKafkaConsumers(consumerGroupId)
if err != nil {
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return err
}

groupData := getGroupData(groupCmdResp, groupCmdConsumersResp)
Expand Down
7 changes: 3 additions & 4 deletions internal/cmd/kafka/command_consumergroup_lag_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (

pcmd "github.com/confluentinc/cli/internal/pkg/cmd"
"github.com/confluentinc/cli/internal/pkg/examples"
"github.com/confluentinc/cli/internal/pkg/kafkarest"
"github.com/confluentinc/cli/internal/pkg/output"
)

Expand Down Expand Up @@ -52,14 +51,14 @@ func (c *lagCommand) getLag(cmd *cobra.Command, args []string) error {
return err
}

kafkaREST, lkc, err := getKafkaRestProxyAndLkcId(c.AuthenticatedCLICommand)
kafkaREST, err := c.GetKafkaREST()
if err != nil {
return err
}

lagGetResp, httpResp, err := kafkaREST.CloudClient.GetKafkaConsumerLag(lkc, consumerGroupId, topic, partition)
lagGetResp, err := kafkaREST.CloudClient.GetKafkaConsumerLag(consumerGroupId, topic, partition)
if err != nil {
return kafkarest.NewError(kafkaREST.CloudClient.GetUrl(), err, httpResp)
return err
}

table := output.NewTable(cmd)
Expand Down
Loading

0 comments on commit 518e9c2

Please sign in to comment.