Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CLI-3355] Fix some confluent kafka topic consume errors #2970

Merged
merged 8 commits into from
Dec 16, 2024
29 changes: 20 additions & 9 deletions internal/kafka/command_topic_consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,13 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
token, err := auth.GetDataplaneToken(c.Context)
if err != nil {
return err

var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return err
}
sgagniere marked this conversation as resolved.
Show resolved Hide resolved
}

consumer, err := newConsumer(group, cluster, c.clientID, configFile, config)
Expand Down Expand Up @@ -228,7 +232,13 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {
return err
}

srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return err
}

var srClient *schemaregistry.Client
var srClusterId string
if slices.Contains(serdes.SchemaBasedFormats, valueFormat) || slices.Contains(serdes.SchemaBasedFormats, keyFormat) {
// Only initialize client and context when schema is specified.
srClient, err = c.GetSchemaRegistryClient(cmd)
Expand All @@ -239,6 +249,13 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {
return err
}
}
// Fetch the current SR cluster id and endpoint
if srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint()
if err != nil {
return err
}
}
sgagniere marked this conversation as resolved.
Show resolved Hide resolved
}

schemaPath, err := createTempDir()
Expand All @@ -258,12 +275,6 @@ func (c *command) consumeCloud(cmd *cobra.Command, args []string) error {
subject = schemaRegistryContext
}

// Fetch the current SR cluster id and endpoint
srClusterId, srEndpoint, err := c.GetCurrentSchemaRegistryClusterIdAndEndpoint()
if err != nil {
return err
}

groupHandler := &GroupHandler{
SrClient: srClient,
SrApiKey: srApiKey,
Expand Down
19 changes: 15 additions & 4 deletions internal/kafka/command_topic_produce.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,11 +635,19 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (
}

// Fetch the SR client endpoint during schema registration
srClusterId, srEndpoint, err := c.GetCurrentSchemaRegistryClusterIdAndEndpoint()
srEndpoint, err := cmd.Flags().GetString("schema-registry-endpoint")
if err != nil {
return nil, nil, err
}

var srClusterId string
if (schemaId.IsSet() || schema != "") && srEndpoint == "" {
srClusterId, srEndpoint, err = c.GetCurrentSchemaRegistryClusterIdAndEndpoint()
if err != nil {
return nil, nil, err
}
}

sgagniere marked this conversation as resolved.
Show resolved Hide resolved
// Initialize the serializer with the same SR endpoint during registration
// The associated schema ID is also required to initialize the serializer
srApiKey, err := cmd.Flags().GetString("schema-registry-api-key")
Expand All @@ -655,9 +663,12 @@ func (c *command) initSchemaAndGetInfo(cmd *cobra.Command, topic, mode string) (
parsedSchemaId = int(binary.BigEndian.Uint32(metaInfo[1:5]))
}

token, err := auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
var token string
if c.Config.IsCloudLogin() { // Do not get token if users are consuming from Cloud while logged out
token, err = auth.GetDataplaneToken(c.Context)
if err != nil {
return nil, nil, err
}
}
err = serializationProvider.InitSerializer(srEndpoint, srClusterId, mode, srApiKey, srApiSecret, token, parsedSchemaId)
if err != nil {
Expand Down