-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
DM-43014: Reduce alert stream credentials timeout #195
Conversation
7234eb0
to
2e59d02
Compare
2e59d02
to
f459dd6
Compare
# should never be committed to source code. | ||
"sasl.username": self.username, | ||
"sasl.password": self.password, | ||
# Batch size limits the largest size of a kafka alert that can be sent. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove leftover comment? There's no config here for size.
@@ -293,6 +313,7 @@ def produceAlerts(self, alerts, ccdVisitId): | |||
ccdVisitId of the alerts sent to the alert stream. Used to write | |||
out alerts which fail to be sent to the alert stream. | |||
""" | |||
self._server_check() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hope we can eventually drop these and just use the server check on startup, as this adds latency. But let's leave it for now while we get things running.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can afford 2×0.5 seconds, and can always tighten the timeout a little. I'm a bit disappointed that this won't help with interrupted connections, but I'd bet it's still a net gain on expected running time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm still working on a solution for interrupted connections, but figured it was important to get this in place for the test/code freeze.
|
||
except KafkaException as e: | ||
self.log.error(e) | ||
raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the log-and-reraise here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I may be missing some functionality of raise, does raise log the error automatically?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All our execution frameworks should log any exceptions that get emitted from the pipeline (certainly Prompt Processing does!). It's generally considered bad practice to log an exception you're not handling, not least because of what happens if everybody does it. 🙂
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've swapped it to
except KafkaException as exception:
raise exception
However, if I didn't include the as expection
part and just did raise
by itself, would that catch the full error correctly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would, but I don't think you need that at all... you could remove that whole try
-except
block and the behavior would be the same. (It might be helpful to document that KafkaException
is expected; that usually goes in the docstring.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, that is very true. I'll add a docstring (something I should have done anyways) and remove the try/except.
f459dd6
to
d9020d5
Compare
Add check_server function to check whether or not the server is contactable. Update unit tests
d082230
to
05b5d74
Compare
The new function
_check_server
pings the kafka server and checks if there are any available topics. If the server is not contactable or if the server topics are empty, it raises and error and stops the pipeline.