Skip to content

Commit

Permalink
[confluent-local] [CLI-2583] print image downloading progress, print …
Browse files Browse the repository at this point in the history
…ports and allow overwriting local ports (#2076)
  • Loading branch information
MuweiHe authored Jul 20, 2023
1 parent e5263f1 commit db001cc
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 43 deletions.
1 change: 1 addition & 0 deletions cmd/lint/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ var flagRules = []linter.FlagRule{
"gcp-project-id",
"if-not-exists",
"kafka-api-key",
"kafka-rest-port",
"local-secrets-file",
"log-exclude-rows",
"max-block-ms",
Expand Down
136 changes: 98 additions & 38 deletions internal/cmd/local/command_kafka_start.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package local

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net"
"os/exec"
"runtime"
"strconv"
Expand All @@ -26,21 +27,25 @@ import (
"github.com/confluentinc/cli/internal/pkg/output"
)

var (
statusBlacklist = []string{"Pulling fs layer", "Waiting", "Downloading", "Download complete", "Verifying Checksum", "Extracting", "Pull complete"}
)

type imagePullOut struct {
Status string `json:"status"`
type ImagePullResponse struct {
Status string `json:"status"`
Error string `json:"error,omitempty"`
Progress string `json:"progress,omitempty"`
ID string `json:"id,omitempty"`
}

func (c *Command) newKafkaStartCommand() *cobra.Command {
return &cobra.Command{
cmd := &cobra.Command{
Use: "start",
Short: "Start a single-node instance of Apache Kafka.",
Args: cobra.NoArgs,
RunE: c.kafkaStart,
}

cmd.Flags().String("kafka-rest-port", "8082", "The port number for Kafka REST.")
cmd.Flags().String("plaintext-port", "", "The port number for plaintext producer and consumer clients. If not specified, a random free port will be used.")

return cmd
}

func (c *Command) kafkaStart(cmd *cobra.Command, args []string) error {
Expand Down Expand Up @@ -91,31 +96,29 @@ func (c *Command) kafkaStart(cmd *cobra.Command, args []string) error {
}
defer out.Close()

buf := new(strings.Builder)
_, err = io.Copy(buf, out)
if err != nil {
return err
}

for _, ss := range strings.Split(buf.String(), "\n") {
var output imagePullOut
if err := json.Unmarshal([]byte(ss), &output); err != nil {
continue
}
var inBlacklist bool
for _, s := range statusBlacklist {
if output.Status == s {
inBlacklist = true
}
scanner := bufio.NewScanner(out)
for scanner.Scan() {
response := new(ImagePullResponse)
text := scanner.Text()
if err := json.Unmarshal([]byte(text), &response); err != nil {
return err
}
if !inBlacklist {
fmt.Printf("%v\n", output.Status)
if response.Status == "Downloading" {
output.Printf("\rDownloading: %s", response.Progress)
} else if response.Status == "Extracting" {
output.Printf("\rExtracting: %s", response.Progress)
} else {
output.Printf("\n%s", response.Status)
}
}
if err := scanner.Err(); err != nil {
return err
}
output.Println("\r")

log.CliLogger.Tracef("Pull confluent-local image success")

if err := c.prepareAndSaveLocalPorts(c.Config.IsTest); err != nil {
if err := c.prepareAndSaveLocalPorts(cmd, c.Config.IsTest); err != nil {
return err
}

Expand Down Expand Up @@ -164,42 +167,99 @@ func (c *Command) kafkaStart(cmd *cobra.Command, args []string) error {
}

output.Printf("Started Confluent Local container %v.\nTo continue your Confluent Local experience, run `confluent local kafka topic create test` and `confluent local kafka topic produce test`.\n", getShortenedContainerId(createResp.ID))
return nil

table := output.NewTable(cmd)
table.Add(c.Config.LocalPorts)
return table.Print()
}

func (c *Command) prepareAndSaveLocalPorts(isTest bool) error {
func (c *Command) prepareAndSaveLocalPorts(cmd *cobra.Command, isTest bool) error {
if c.Config.LocalPorts != nil {
return nil
}

if isTest {
c.Config.LocalPorts = &v1.LocalPorts{
KafkaRestPort: "2996",
PlaintextPort: "2997",
BrokerPort: "2998",
ControllerPort: "2999",
BrokerPort: "2996",
ControllerPort: "2997",
KafkaRestPort: "2998",
PlaintextPort: "2999",
}
} else {
freePorts, err := freeport.GetFreePorts(4)
freePorts, err := freeport.GetFreePorts(3)
if err != nil {
return err
}

c.Config.LocalPorts = &v1.LocalPorts{
KafkaRestPort: strconv.Itoa(freePorts[0]),
PlaintextPort: strconv.Itoa(freePorts[1]),
BrokerPort: strconv.Itoa(freePorts[2]),
ControllerPort: strconv.Itoa(freePorts[3]),
KafkaRestPort: strconv.Itoa(8082),
PlaintextPort: strconv.Itoa(freePorts[0]),
BrokerPort: strconv.Itoa(freePorts[1]),
ControllerPort: strconv.Itoa(freePorts[2]),
}

kafkaRestPort, err := cmd.Flags().GetString("kafka-rest-port")
if err != nil {
return err
}
if kafkaRestPort != "" {
c.Config.LocalPorts.KafkaRestPort = kafkaRestPort
}

plaintextPort, err := cmd.Flags().GetString("plaintext-port")
if err != nil {
return err
}
if plaintextPort != "" {
c.Config.LocalPorts.PlaintextPort = plaintextPort
}
}

if err := c.validateCustomizedPorts(); err != nil {
return err
}

if err := c.Config.Save(); err != nil {
return errors.Wrap(err, "failed to save local ports to configuration file")
}

return nil
}

func (c *Command) validateCustomizedPorts() error {
kafkaRestLn, err := net.Listen("tcp", ":"+c.Config.LocalPorts.KafkaRestPort)
if err != nil {
freePort, err := freeport.GetFreePort()
if err != nil {
return err
}
invalidKafkaRestPort := c.Config.LocalPorts.KafkaRestPort
c.Config.LocalPorts.KafkaRestPort = strconv.Itoa(freePort)
log.CliLogger.Warnf("Kafka REST port %s is not available, using port %d instead.", invalidKafkaRestPort, freePort)
} else {
if err := kafkaRestLn.Close(); err != nil {
return err
}
}

plaintextLn, err := net.Listen("tcp", ":"+c.Config.LocalPorts.PlaintextPort)
if err != nil {
freePort, err := freeport.GetFreePort()
if err != nil {
return err
}
invalidPlaintextPort := c.Config.LocalPorts.PlaintextPort
c.Config.LocalPorts.PlaintextPort = strconv.Itoa(freePort)
log.CliLogger.Warnf("Plaintext port %s is not available, using port %d instead.", invalidPlaintextPort, freePort)
} else {
if err := plaintextLn.Close(); err != nil {
return err
}
}

return nil
}

func getContainerEnvironmentWithPorts(ports *v1.LocalPorts) []string {
return []string{
"KAFKA_BROKER_ID=1",
Expand Down
8 changes: 4 additions & 4 deletions internal/pkg/config/v1/local_ports.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package v1

type LocalPorts struct {
KafkaRestPort string `json:"kafka_rest_port"`
BrokerPort string `json:"broker_port"`
ControllerPort string `json:"controller_port"`
PlaintextPort string `json:"plaintext_port"`
BrokerPort string `human:"Broker Port" json:"broker_port"`
ControllerPort string `human:"Controller Port" json:"controller_port"`
KafkaRestPort string `human:"Kafka Rest Port" json:"kafka_rest_port"`
PlaintextPort string `human:"Plaintext Port" json:"plaintext_port"`
}
4 changes: 4 additions & 0 deletions test/fixtures/output/local/kafka/start-help-onprem.golden
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ Start a single-node instance of Apache Kafka.
Usage:
confluent local kafka start [flags]

Flags:
--kafka-rest-port string The port number for Kafka REST. (default "8082")
--plaintext-port string The port number for plaintext producer and consumer clients. If not specified, a random free port will be used.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
Expand Down
4 changes: 4 additions & 0 deletions test/fixtures/output/local/kafka/start-help.golden
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ Start a single-node instance of Apache Kafka.
Usage:
confluent local kafka start [flags]

Flags:
--kafka-rest-port string The port number for Kafka REST. (default "8082")
--plaintext-port string The port number for plaintext producer and consumer clients. If not specified, a random free port will be used.

Global Flags:
-h, --help Show help for this command.
--unsafe-trace Equivalent to -vvvv, but also log HTTP requests and responses which might contain plaintext secrets.
Expand Down
6 changes: 6 additions & 0 deletions test/fixtures/output/local/kafka/start.golden
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@ Digest: sha256:[a-zA-Z0-9]*.
Status: (?:Image is up to date|Downloaded newer image) for confluentinc/confluent-local:latest
Started Confluent Local container [a-zA-Z0-9]*.
To continue your Confluent Local experience, run `confluent local kafka topic create test` and `confluent local kafka topic produce test`.
+-----------------+------+
| Broker Port | 2996 |
| Controller Port | 2997 |
| Kafka Rest Port | 2998 |
| Plaintext Port | 2999 |
+-----------------+------+
2 changes: 1 addition & 1 deletion test/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (s *CLITestSuite) TestLocalKafka() {
s.runIntegrationTest(test)
}

time.Sleep(25 * time.Second)
time.Sleep(40 * time.Second)

tests2 := []CLITest{
{args: "local kafka topic create test", fixture: "local/kafka/topic/create.golden"},
Expand Down

0 comments on commit db001cc

Please sign in to comment.