Skip to content

Commit

Permalink
[CLI-2569] Check for magic byte when consuming avro/protobuf/jsonsche…
Browse files Browse the repository at this point in the history
…ma messages (#2028)
  • Loading branch information
sgagniere authored Jul 6, 2023
1 parent db7fd27 commit 524c8ee
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
5 changes: 4 additions & 1 deletion internal/cmd/kafka/confluent_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,11 @@ func RunConsumer(consumer *ckafka.Consumer, groupHandler *GroupHandler) error {
}

func (h *GroupHandler) RequestSchema(value []byte) (string, map[string]string, error) {
if len(value) == 0 || value[0] != 0x0 {
return "", nil, errors.NewErrorWithSuggestions("unknown magic byte", fmt.Sprintf("Check that all messages from this topic are in the %s format.", h.Format))
}
if len(value) < messageOffset {
return "", nil, errors.New(errors.FailedToFindSchemaIDErrorMsg)
return "", nil, errors.New("failed to find schema ID in topic data")
}

// Retrieve schema from cluster only if schema is specified.
Expand Down
1 change: 0 additions & 1 deletion internal/pkg/errors/error_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ const (
TopicExistsOnPremErrorMsg = `topic "%s" already exists for the Kafka cluster`
TopicExistsOnPremSuggestions = "To list topics for the cluster, use `confluent kafka topic list --url <url>`."
FailedToProduceErrorMsg = "failed to produce offset %d: %s\n"
FailedToFindSchemaIDErrorMsg = "failed to find schema ID in topic data"
MissingKeyErrorMsg = "missing key in message"
UnknownValueFormatErrorMsg = "unknown value schema format"
TopicExistsErrorMsg = `topic "%s" already exists for Kafka cluster "%s"`
Expand Down

0 comments on commit 524c8ee

Please sign in to comment.