Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds support for Kafka Keys, and handles nil for both keys and values… #39

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
kubectl create namespace keda
helm install keda kedacore/keda --namespace keda
helm install keda kedacore/keda --namespace keda --version 1.5.0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ranoble We can remove --version 1.5.0 flag specifying the version

- name: Create Docker Image for HTTP server
run: |
cd test/server/
Expand Down
58 changes: 54 additions & 4 deletions kafka-http-connector/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,28 @@ func getConfig(metadata kafkaMetadata) (*sarama.Config, error) {
return config, nil
}

func extractControlHeaders(headers []sarama.RecordHeader) ([]byte, bool, []sarama.RecordHeader) {
var (
key []byte
)
tombstone := false
var cleaned []sarama.RecordHeader
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaned is essentially any other key the producer sets that are not keda/fission related, right?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - couldn't think of a better name.

for _, header := range headers {
if strings.ToLower(string(header.Key)) == "keda-message-key" {
key = header.Value
continue
}

if strings.ToLower(string(header.Key)) == "keda-message-tombstone" {
tombstone = true
continue
}
cleaned = append(cleaned, header)
}

return key, tombstone, cleaned
}

// kafkaConnector represents a Sarama consumer group consumer
type kafkaConnector struct {
ready chan bool
Expand Down Expand Up @@ -209,7 +231,7 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/Shopify/sarama/blob/master/consumer_group.go#L27-L29
for message := range claim.Messages() {
conn.logger.Info(fmt.Sprintf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic))
conn.logger.Info(fmt.Sprintf("Message claimed: key = %s, value = %s, timestamp = %v, topic = %s", string(message.Key), string(message.Value), message.Timestamp, message.Topic))
msg := string(message.Value)

headers := http.Header{
Expand All @@ -220,6 +242,17 @@ func (conn *kafkaConnector) ConsumeClaim(session sarama.ConsumerGroupSession, cl
"KEDA-Source-Name": {conn.connectorData.SourceName},
}

// Add the message key, if it's been set.
if message.Key != nil {
headers.Add("KEDA-Message-Key", string(message.Key))
}

// Indicate that this is a tombstone, not a empty message.
// Normally indicative of a deletion request
if message.Value == nil {
headers.Add("KEDA-Message-Tombstone", "true")
}

// Set the headers came from Kafka record
for _, h := range message.Headers {
headers.Add(string(h.Key), string(h.Value))
Expand Down Expand Up @@ -275,12 +308,29 @@ func (conn *kafkaConnector) errorHandler(err error) {
}

func (conn *kafkaConnector) responseHandler(msg string, headers []sarama.RecordHeader) bool {

// extract the key and tombstone should they exist.
key, tombstone, headers := extractControlHeaders(headers)

if len(conn.connectorData.ResponseTopic) > 0 {
_, _, err := conn.producer.SendMessage(&sarama.ProducerMessage{
message := &sarama.ProducerMessage{
Topic: conn.connectorData.ResponseTopic,
Value: sarama.StringEncoder(msg),
Headers: headers,
})
}

if key != nil {
message.Key = sarama.StringEncoder(key)
}

if len(msg) > 0 || !tombstone {
message.Value = sarama.StringEncoder(msg)
}

if tombstone {
conn.logger.Warn("Sending a Tombstone")
ranoble marked this conversation as resolved.
Show resolved Hide resolved
}

_, _, err := conn.producer.SendMessage(message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense for us to do the same for error topic as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will add that.

if err != nil {
conn.logger.Warn("failed to publish response body from http request to topic",
zap.Error(err),
Expand Down