-
Notifications
You must be signed in to change notification settings - Fork 30
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds support for Kafka Keys, and handles nil for both keys and values… #39
base: main
Are you sure you want to change the base?
Conversation
… as kafka would expect
kafka-http-connector/main.go
Outdated
key []byte | ||
) | ||
tombstone := false | ||
var cleaned []sarama.RecordHeader |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cleaned is essentially any other key the producer sets that are not keda/fission related, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes - couldn't think of a better name.
conn.logger.Warn("Sending a Tombstone") | ||
} | ||
|
||
_, _, err := conn.producer.SendMessage(message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it make sense for us to do the same for error topic as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will add that.
@ranoble Thanks a lot for this PR ! This would be such a nice addition to the connector. I have asked a few questions, please take a look and get this merged :) |
…pic. This also allows a function to return headers to provide more information about an error, and these will be included in the Kafka message headers
common/util.go
Outdated
@@ -92,7 +92,7 @@ func HandleHTTPRequest(message string, headers http.Header, data ConnectorMetada | |||
} | |||
|
|||
if resp.StatusCode < 200 && resp.StatusCode > 300 { | |||
return nil, fmt.Errorf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) | |||
return resp, fmt.Errorf("request returned failure: %v. http_endpoint: %v, source: %v", resp.StatusCode, data.HTTPEndpoint, data.SourceName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ranoble If status code isn't in that range, it's not a successful request and that's why I had returned nil
. Can you please explain why this might be valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To write the key, I need the key header. Even though the function fails, it still returns a valid result (just not the one we wanted).
This could / would support distributed tracing, or the option to pass back useful debug info, so that the consumer of the error topic can take the correct action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. In that case, can you please do the following?
- Raise the PR in a separate branch as the common package is used by all connectors
- Change the function return arguments to something better than it currently is. I am thinking status code, response and error. What are your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More than happy to.
- Sure - will do so.
- I'm a little more nervous about this one. At this point this change is backwards compatible at the moment,
resp
can be safely ignored by any client. After changing the return arguments, if someone is basing a custom connector off of this and upgrades that will cause a breaking change in their code (of course this may not actually be an issue in practise).
Happy to change it on your suggestion - but wanted to highlight that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's true. Any change in the common package will have to be adopted by the remaining connectors . @therahulbhati
What are your thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, @RealHarshThakur it's true. We should have this as a separate PR, we would also need to incorporate those changes for each connector as part of that PR.
.github/workflows/ci.yml
Outdated
@@ -39,7 +39,7 @@ jobs: | |||
helm repo add kedacore https://kedacore.github.io/charts | |||
helm repo update | |||
kubectl create namespace keda | |||
helm install keda kedacore/keda --namespace keda | |||
helm install keda kedacore/keda --namespace keda --version 1.5.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ranoble We can remove --version 1.5.0
flag specifying the version
@ranoble Could you please resolve the merge conflicts? |
… as kafka would expect
Keys are passed as a header:
KEDA-Message-Key
. If no key exists, no header will be sent (to avoid the nil == '' problem). If a response header ofKEDA-Message-Key
exists then it is used in the response topic, if not; then no key is set so that the round robin partitioner is used as opposed to a hashed blank string.In the event a Tombstone is sent as a value of a incoming event, the value of the request is a blank string, but an additional header is set -
KEDA-Message-Tombstone
.For a tombstone to be published to the response topic, both the header value needs to be set AND the body must be empty.
I'm not super happy with the headers, but it felt like the simplest most idiomatic method for achieving this.