From 524c8ee45851064244f8d1b493da2aa383fd8432 Mon Sep 17 00:00:00 2001 From: Steven Gagniere <108363707+sgagniere@users.noreply.github.com> Date: Thu, 6 Jul 2023 14:35:05 -0700 Subject: [PATCH] [CLI-2569] Check for magic byte when consuming avro/protobuf/jsonschema messages (#2028) --- internal/cmd/kafka/confluent_kafka.go | 5 ++++- internal/pkg/errors/error_message.go | 1 - 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/internal/cmd/kafka/confluent_kafka.go b/internal/cmd/kafka/confluent_kafka.go index 6fe3436e70..6ba51ab964 100644 --- a/internal/cmd/kafka/confluent_kafka.go +++ b/internal/cmd/kafka/confluent_kafka.go @@ -274,8 +274,11 @@ func RunConsumer(consumer *ckafka.Consumer, groupHandler *GroupHandler) error { } func (h *GroupHandler) RequestSchema(value []byte) (string, map[string]string, error) { + if len(value) == 0 || value[0] != 0x0 { + return "", nil, errors.NewErrorWithSuggestions("unknown magic byte", fmt.Sprintf("Check that all messages from this topic are in the %s format.", h.Format)) + } if len(value) < messageOffset { - return "", nil, errors.New(errors.FailedToFindSchemaIDErrorMsg) + return "", nil, errors.New("failed to find schema ID in topic data") } // Retrieve schema from cluster only if schema is specified. diff --git a/internal/pkg/errors/error_message.go b/internal/pkg/errors/error_message.go index 1c8266e79c..896358956d 100644 --- a/internal/pkg/errors/error_message.go +++ b/internal/pkg/errors/error_message.go @@ -177,7 +177,6 @@ const ( TopicExistsOnPremErrorMsg = `topic "%s" already exists for the Kafka cluster` TopicExistsOnPremSuggestions = "To list topics for the cluster, use `confluent kafka topic list --url `." FailedToProduceErrorMsg = "failed to produce offset %d: %s\n" - FailedToFindSchemaIDErrorMsg = "failed to find schema ID in topic data" MissingKeyErrorMsg = "missing key in message" UnknownValueFormatErrorMsg = "unknown value schema format" TopicExistsErrorMsg = `topic "%s" already exists for Kafka cluster "%s"`