diff --git a/go.sum b/go.sum index f21aa6b993..44d6cdf51b 100644 --- a/go.sum +++ b/go.sum @@ -219,8 +219,6 @@ github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0 h1:QqtIFEB5E3CIyGMJd7NQBEt github.com/confluentinc/ccloud-sdk-go-v2/flink v0.9.0/go.mod h1:GPj4sfR85OyiFQUMNEq1DtPOjYVAuE222Z6Mcapwa48= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0 h1:2QuFhvrfU4AdxyfWWPFY0fqEg8p8wmKFfC6N+35pxHg= github.com/confluentinc/ccloud-sdk-go-v2/flink-artifact v0.1.0/go.mod h1:cl7LEL6bFgiXQ+8sEZvo3BrYZxDOvGkx4jV7eX1ssN4= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.12.0 h1:fra7uBCCtYkUFtHb6ununxMWqYuVonG52t1mV9o7qRE= -github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.12.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0 h1:8Y1uXjolI2d5mawcfLn4OfJ81WRMQpjMFWdBm3dLdrk= github.com/confluentinc/ccloud-sdk-go-v2/flink-gateway v0.17.0/go.mod h1:cJ6erfVlWSyz6L+2dR46cF2+s5I2r+pTNrPm2fNbcqU= github.com/confluentinc/ccloud-sdk-go-v2/iam v0.11.0 h1:ZUAow4L6De1FwYoiwvEodm4lvxc+46wNW+IEAb7K9VU= diff --git a/internal/asyncapi/command_export.go b/internal/asyncapi/command_export.go index 32c0a66b5e..a8caa65a3c 100644 --- a/internal/asyncapi/command_export.go +++ b/internal/asyncapi/command_export.go @@ -60,8 +60,6 @@ type flags struct { topics []string } -// messageOffset is 5, as the schema ID is stored at the [1:5] bytes of a message as meta info (when valid) -const messageOffset = 5 const protobufErrorMessage = "protobuf is not supported" func (c *command) newExportCommand() *cobra.Command { @@ -292,8 +290,6 @@ func (c *command) getMessageExamples(consumer *ckgo.Consumer, topicName, content if err != nil { return nil, err } - // Message body is encoded after 5 bytes of meta information. - value = value[messageOffset:] if err := deserializationProvider.LoadSchema(schemaPath, referencePathMap); err != nil { return nil, err } diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index 76abd4f3e6..f454e9a6d5 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -323,8 +323,8 @@ func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error { // Initialize the value serializer with the same SR endpoint during registration // The associated schema ID is also required to initialize the serializer var valueSchemaId = -1 - if len(valueMetaInfo) >= 5 { - valueSchemaId = int(binary.BigEndian.Uint32(valueMetaInfo[1:5])) + if len(valueMetaInfo) >= messageOffset { + valueSchemaId = int(binary.BigEndian.Uint32(valueMetaInfo[1:messageOffset])) } if err := valueSerializer.InitSerializer(srEndpoint, srClusterId, "value", srApiKey, srApiSecret, token, valueSchemaId); err != nil { return err @@ -485,7 +485,7 @@ func getProduceMessage(cmd *cobra.Command, keyMetaInfo, valueMetaInfo []byte, to return message, nil } -func serializeMessage(keyMetaInfo, valueMetaInfo []byte, topic, data, delimiter string, parseKey bool, keySerializer, valueSerializer serdes.SerializationProvider) ([]byte, []byte, error) { +func serializeMessage(_, _ []byte, topic, data, delimiter string, parseKey bool, keySerializer, valueSerializer serdes.SerializationProvider) ([]byte, []byte, error) { var serializedKey []byte val := data if parseKey { @@ -508,7 +508,7 @@ func serializeMessage(keyMetaInfo, valueMetaInfo []byte, topic, data, delimiter return nil, nil, err } - return append(keyMetaInfo, serializedKey...), append(valueMetaInfo, serializedValue...), nil + return serializedKey, serializedValue, nil } func getKeyAndValue(schemaBased bool, data, delimiter string) (string, string, error) { diff --git a/internal/kafka/confluent_kafka.go b/internal/kafka/confluent_kafka.go index 05079662fd..3aaf3575b4 100644 --- a/internal/kafka/confluent_kafka.go +++ b/internal/kafka/confluent_kafka.go @@ -209,7 +209,6 @@ func consumeMessage(message *ckgo.Message, h *GroupHandler) error { if err != nil { return err } - message.Key = message.Key[messageOffset:] if err := keyDeserializer.LoadSchema(schemaPath, referencePathMap); err != nil { return err } @@ -243,8 +242,6 @@ func consumeMessage(message *ckgo.Message, h *GroupHandler) error { if err != nil { return err } - // Message body is encoded after 5 bytes of meta information. - message.Value = message.Value[messageOffset:] if err := valueDeserializer.LoadSchema(schemaPath, referencePathMap); err != nil { return err }