-
Hi, I created a streaming app that reads messages from a multi schema topic (AVRO schemas used) and sends each message type to it's own topic, filtering out messages types that we don't want to stream. I forked the Streamiz repo, created my app there and added a test project to investigate how to use dependency injection and then reused the tests to explore error handling: My app will have multiple StreamTasks (one per multi schema topic, ex. I'm not sure how likely it will be for an exception to be thrown in the callback functions I wrote for
I understand that question 2. would eventually require handling the error after exhausting a limited number of retries. At this point I will probably need to persist the message in a store and reprocess it later on and let the stream continue. It would still be nice to have a retry capability before throwing the message in some error store. Another error that could happen is if a stream task publishes a message to a single message topic, and the message breaks the Avro schema (ie, it is not compatible with the previous schema). In my tests am setting up the producer mock to throw an exception: https://github.com/bmmptlgc/kafka-streams-dotnet/blob/d70c1017254761236a418764c19b2c6365bcf8c1/test/Sample.Kafka.Supplier.DI.UnitTests/KafkaTopicSplitter/When_consuming_a_message_with_a_type_configured_to_publish_to_a_single_schema_topic.cs#L56
Thanks in advance, |
Beta Was this translation helpful? Give feedback.
Replies: 6 comments 14 replies
-
Hey @bmmptlgc ,
Correct
No, today the topology can only fail or continue, I mean skip the current message. But you can implement your own pattern with try/catch in your lambda and push the invalid record to a DQL topic. This DLQ topic will be process by a specific KStreams application, or specific Consumer to implement your retry logic with a specific retry number and/or interval. etc ..
If you return FAIL, the current stream thread will shutdown properly and die. If you have another stream thread in your instance, this one will continue to work except if it process your invalid record. In that case, it will probably fail as well.
The Hope it helps. |
Beta Was this translation helpful? Give feedback.
-
Hi @bmmptlgc, The You can parallelise your processing via It will not close the specific "stream" of Btw, I'm currently conducting a satisfaction survey to understand how I can better serve and I would love to get your feedback on the product. Your insights are invaluable and will help us shape the future of our product to better meet your needs. The survey will only take a few minutes, and your responses will be completely confidential. Thank you for your time and feedback! Best regards, |
Beta Was this translation helpful? Give feedback.
-
I was able to simulate ProductionExceptionHandler in my tests, by having my producer mock set to throw ProduceException. I don't see that the deliveryReport that is passed to this handler contains the original exception (ProduceException) and I am trying to determin when I should retry or fail. Is there a recommended way to return RETRY by inspecting properties of report.Error (since Error.IsRetriable is internal 😢)? |
Beta Was this translation helpful? Give feedback.
-
Hey @bmmptlgc , Some errors are not retriable and manageable, for instance if you don't have the authorisation to publish into this topic. Does make sense to CONTINUE or RETRY, because the next time you will have the same exception. So this is the workflow to manage ProductionException :
Hope it's clear |
Beta Was this translation helpful? Give feedback.
-
Another topic related to error handling is Observability. I was able to set it up on my topic splitter and the "Streamiz" metrics are being exported by my company's own observability implementation. All I had to do aside from configuring open telemetry like we do for any other application and adding the "Streamiz" meter to the builder, was to set up the metrics reporter this way:
Then, when the metrics are exported (by default every 30 seconds) I get metrics exported like this one, for example:
However, if my stream fails and shuts down any metrics that where added to the sensors collection after the last export and before the shutdown, are not exported. Should the Streamiz code call |
Beta Was this translation helpful? Give feedback.
-
Still on the topic of observability, I couldn't see any sensors recording failed messages. One of my requirements is to record metrics about failures. The only place I can do this, that I know of, is in the |
Beta Was this translation helpful? Give feedback.
Probably yes you right, Export Metrics is called regularly but if you stream fail during a transition period, the last metrics values are not exported.
Could be a great enhancement. Feel free to create a specific GH issue on that.