diff --git a/internal/kafka/command_topic_produce.go b/internal/kafka/command_topic_produce.go index 73c48b3266..b3f1b8ae18 100644 --- a/internal/kafka/command_topic_produce.go +++ b/internal/kafka/command_topic_produce.go @@ -89,6 +89,15 @@ func (c *command) produce(cmd *cobra.Command, args []string) error { return err } + parseKey, err := cmd.Flags().GetBool("parse-key") + if err != nil { + return err + } + + if cmd.Flags().Changed("key-format") && !parseKey { + return errors.New("`--parse-key` must be set when `key-format` is set") + } + configFile, err := cmd.Flags().GetString("config-file") if err != nil { return err diff --git a/internal/kafka/command_topic_produce_onprem.go b/internal/kafka/command_topic_produce_onprem.go index 0bb0d67a56..a350b6e79f 100644 --- a/internal/kafka/command_topic_produce_onprem.go +++ b/internal/kafka/command_topic_produce_onprem.go @@ -105,6 +105,15 @@ func (c *command) produceOnPrem(cmd *cobra.Command, args []string) error { return err } + parseKey, err := cmd.Flags().GetBool("parse-key") + if err != nil { + return err + } + + if cmd.Flags().Changed("key-format") && !parseKey { + return errors.New("`--parse-key` must be set when `key-format` is set") + } + keySchema, err := cmd.Flags().GetString("key-schema") if err != nil { return err diff --git a/pkg/serdes/integer_deserialization_provider.go b/pkg/serdes/integer_deserialization_provider.go index a34c7b3863..b5860c1051 100644 --- a/pkg/serdes/integer_deserialization_provider.go +++ b/pkg/serdes/integer_deserialization_provider.go @@ -12,5 +12,9 @@ func (s *IntegerDeserializationProvider) LoadSchema(_ string, _ map[string]strin } func (s *IntegerDeserializationProvider) Deserialize(data []byte) (string, error) { - return fmt.Sprintf("%d", binary.BigEndian.Uint32(data)), nil + if len(data) == 0 { + return "", nil + } + + return fmt.Sprintf("%d", binary.LittleEndian.Uint32(data)), nil }