Skip to content

Commit

Permalink
feat: add keys into kafka header while producing (#2143)
Browse files Browse the repository at this point in the history
Signed-off-by: Vigith Maurice <[email protected]>
  • Loading branch information
vigith authored Oct 11, 2024
1 parent 1f0ef3f commit 271e459
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 0 deletions.
10 changes: 10 additions & 0 deletions docs/user-guide/sinks/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,16 @@

A `Kafka` sink is used to forward the messages to a Kafka topic. Kafka sink supports configuration overrides.

## Kafka Headers

We will insert `keys` into the Kafka header, but since `keys` is an array, we will add `keys` into the header in the
following format.

* `__keys_len` will have the number of `key` in the header. if `__keys_len` == `0`, means no `keys` are present.
* `__keys_%d` will have the `key`, e.g., `__key_0` will be the first key, and so forth.

## Example

```yaml
spec:
vertices:
Expand Down
22 changes: 22 additions & 0 deletions pkg/sinks/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,32 @@ func (tk *ToKafka) Write(_ context.Context, messages []isb.Message) ([]isb.Offse
}
}
}()

for index, msg := range messages {
// insert keys in the header.
// since keys is an array, to decompose it, we need len and key at each index.
var headers []sarama.RecordHeader
// insert __key_len
keyLen := sarama.RecordHeader{
Key: []byte("__key_len"),
Value: []byte(fmt.Sprintf("%d", len(msg.Keys))),
}
headers = append(headers, keyLen)

// write keys into header if length > 0
if len(msg.Keys) > 0 {
for idx, key := range msg.Keys {
headers = append(headers, sarama.RecordHeader{
Key: []byte(fmt.Sprintf("__key_%d", idx)),
Value: []byte(key),
})
}
}

message := &sarama.ProducerMessage{
Topic: tk.topic,
Value: sarama.ByteEncoder(msg.Payload),
Headers: headers,
Metadata: index, // Use metadata to identify if it succeeds or fails in the async return.
}
tk.producer.Input() <- message
Expand Down

0 comments on commit 271e459

Please sign in to comment.