From b43ca424cf93d73f1d8f96e319c594ffaca18359 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Wed, 20 Apr 2022 21:55:08 -0400 Subject: [PATCH 1/2] Adding support to produce avro encoded messages --- cmd/kaf/produce.go | 16 ++++++++++++++++ pkg/avro/schema.go | 27 +++++++++++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index 9c05fcf6..ea3ace75 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -27,6 +27,7 @@ var ( partitionFlag int32 bufferSizeFlag int inputModeFlag string + schemaID int ) func init() { @@ -46,6 +47,8 @@ func init() { produceCmd.Flags().StringVar(×tampFlag, "timestamp", "", "Select timestamp for record") produceCmd.Flags().Int32VarP(&partitionFlag, "partition", "p", -1, "Partition to produce to") + produceCmd.Flags().IntVarP(&schemaID, "schema-id", "", -1, "Value schema id for avro messsage encoding") + produceCmd.Flags().StringVarP(&inputModeFlag, "input-mode", "", "line", "Scanning input mode: [line|full]") produceCmd.Flags().IntVarP(&bufferSizeFlag, "line-length-limit", "", 0, "line length limit in line input mode") } @@ -138,6 +141,13 @@ var produceCmd = &cobra.Command{ } + if schemaID != -1 { + schemaCache = getSchemaCache() + if schemaCache == nil { + errorExit("Error getting a instance of schemaCache") + } + } + var headers []sarama.RecordHeader for _, h := range headerFlag { v := strings.SplitN(h, ":", 2) @@ -166,6 +176,12 @@ var produceCmd = &cobra.Command{ } else { errorExit("Failed to load payload proto type") } + } else if schemaID != -1 { + avro, err := schemaCache.EncodeMessage(schemaID, data) + if err != nil { + errorExit("Failed to encode avro", err) + } + data = avro } var ts time.Time diff --git a/pkg/avro/schema.go b/pkg/avro/schema.go index 1b0c06e2..fe260a11 100644 --- a/pkg/avro/schema.go +++ b/pkg/avro/schema.go @@ -111,3 +111,30 @@ func (c *SchemaCache) DecodeMessage(b []byte) (message []byte, err error) { return message, nil } + +// EncodeMessage returns a binary representation of an Avro-encoded message. +func (c *SchemaCache) EncodeMessage(schemaID int, json []byte) (message []byte, err error) { + codec, err := c.getCodecForSchemaID(schemaID) + if err != nil { + return nil, err + } + + // Creates a header with an initial zero byte and + // the schema id encoded as a big endian uint32 + buf := make([]byte, 5) + binary.BigEndian.PutUint32(buf[1:5], uint32(schemaID)) + + // Convert textual json data to native Go form + native, _, err := codec.NativeFromTextual(json) + if err != nil { + return nil, err + } + + // Convert native Go form to binary Avro data + message, err = codec.BinaryFromNative(buf, native) + if err != nil { + return nil, err + } + + return message, nil +} From 2e2d87e22ad14ef921300223bac7e92f6d7d29e2 Mon Sep 17 00:00:00 2001 From: Fabio Mendes Date: Thu, 21 Apr 2022 13:19:54 -0400 Subject: [PATCH 2/2] avro encoded messages - code review feedback --- cmd/kaf/produce.go | 10 +++++----- pkg/avro/schema.go | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cmd/kaf/produce.go b/cmd/kaf/produce.go index ea3ace75..25c071b1 100644 --- a/cmd/kaf/produce.go +++ b/cmd/kaf/produce.go @@ -27,7 +27,7 @@ var ( partitionFlag int32 bufferSizeFlag int inputModeFlag string - schemaID int + avroSchemaID int ) func init() { @@ -47,7 +47,7 @@ func init() { produceCmd.Flags().StringVar(×tampFlag, "timestamp", "", "Select timestamp for record") produceCmd.Flags().Int32VarP(&partitionFlag, "partition", "p", -1, "Partition to produce to") - produceCmd.Flags().IntVarP(&schemaID, "schema-id", "", -1, "Value schema id for avro messsage encoding") + produceCmd.Flags().IntVarP(&avroSchemaID, "avro-schema-id", "", -1, "Value schema id for avro messsage encoding") produceCmd.Flags().StringVarP(&inputModeFlag, "input-mode", "", "line", "Scanning input mode: [line|full]") produceCmd.Flags().IntVarP(&bufferSizeFlag, "line-length-limit", "", 0, "line length limit in line input mode") @@ -141,7 +141,7 @@ var produceCmd = &cobra.Command{ } - if schemaID != -1 { + if avroSchemaID != -1 { schemaCache = getSchemaCache() if schemaCache == nil { errorExit("Error getting a instance of schemaCache") @@ -176,8 +176,8 @@ var produceCmd = &cobra.Command{ } else { errorExit("Failed to load payload proto type") } - } else if schemaID != -1 { - avro, err := schemaCache.EncodeMessage(schemaID, data) + } else if avroSchemaID != -1 { + avro, err := schemaCache.EncodeMessage(avroSchemaID, data) if err != nil { errorExit("Failed to encode avro", err) } diff --git a/pkg/avro/schema.go b/pkg/avro/schema.go index fe260a11..9564d315 100644 --- a/pkg/avro/schema.go +++ b/pkg/avro/schema.go @@ -119,9 +119,9 @@ func (c *SchemaCache) EncodeMessage(schemaID int, json []byte) (message []byte, return nil, err } - // Creates a header with an initial zero byte and - // the schema id encoded as a big endian uint32 - buf := make([]byte, 5) + // Creates a header with an initial zero byte and + // the schema id encoded as a big endian uint32 + buf := make([]byte, 5) binary.BigEndian.PutUint32(buf[1:5], uint32(schemaID)) // Convert textual json data to native Go form