Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Redis streams kind of replay nacks even when auto_replay_nacks is false #84

Open
bssyousefi opened this issue Jul 22, 2024 · 3 comments
Assignees

Comments

@bssyousefi
Copy link

I have a simple bento application which reads data from a Redis stream and put the data into a SQL db. I want to make sure that if for some reasons a record faced an error while trying to put the data into db, the record stays not acknowledged on Redis streams so I can easily rerun the application after I fixed the issue with the SQL insertion.
As far as I understand by default if there is an error in the output no acknowledgement will take place, however the record causing the error will keep replaying the whole pipeline. To avoid this, there is a parameter called auto_replay_nacks that could help me.
Based on my understanding, if I give value false to auto_replay_nacks I expect no failed record repeating the pipeline and also the failed record being not acknowledged. However, when I'm testing the scenario, the failed record keeps repeating the whole pipeline regardless of the value of parameter auto_replay_nacks.

As I went through the code, I found out that when the parameter auto_replay_nacks is false this part of the code is adding the failed record to the pendingMsgs, so the failed record repeats forever.

return msg.payload, func(rctx context.Context, res error) error {

return msg.payload, func(rctx context.Context, res error) error {
	f res != nil {
		r.pendingMsgsMut.Lock()
		r.pendingMsgs = append(r.pendingMsgs, msg)
		r.pendingMsgsMut.Unlock()
	} else {
		r.addAsyncAcks(msg.stream, msg.id)
	}
	return nil
}, nil

I'm not sure if my expectation from auto_replay_nacks is valid or not. I would appreciate it if you kindly let me know how I can avoid the repeating in this scenario.

For replicating the case you could use a simple input-output configuration, for which you have a record that fails in the output section.

@gregfurman
Copy link
Collaborator

Hey. Thanks for opening this issue!

I've done a bit of a deep dive here and think see the confusion, and agree that the auto-replaying functionality in the context of a component like a redis_stream is maybe not that clear. Disclaimer though: I'm no Redis expert!

So auto_replay_nacks is useful for when an ack is called with an error, but the ack handler doesn't have any error handling capabiltiies defined.

// AutoRetryNacksBatched wraps a batched input implementation with a component
// that automatically reattempts messages that fail downstream. This is useful
// for inputs that do not support nacks, and therefore don't have an answer for
// when an ack func is called with an error.
//
// When messages fail to be delivered they will be reattempted with back off
// until success or the stream is stopped.

Fundamentally, however, a redis stream requires that messages be acknowledged (via an XACK command) before it can be removed from the Redis stream. The code you linked above shows Bento adding non-errored messages to an async queue to be ACK'd asynchronously r.addAsyncAcks(msg.stream, msg.id).

On the Redis stream side, an XACK will all evict a message from a consumer group's Pending Entries List.

From the Redis streams docs:

Consuming a message, however, requires an explicit acknowledgment using a specific command. Redis interprets the acknowledgment as: this message was correctly processed so it can be evicted from the consumer group.

Now back to your original problem. You're correct in saying that when downstream messages from a redis_stream error out, that pendingMsgs are appended to with the failed message(s). Also, notice how the ack function only does this when ack is passed an error via the res parameter.

If you do not want these messages to be re-queued, you'll want to ack them in some way (or potentially even catch the error) -- signalling that Bento asynchronously discard them from the redis stream.

There are a couple of ways to do this so I'd recommend giving a look at the Error Handling
docs. Specifically, the Dropping Failed Messages section could be relevant since:

This will remove any failed messages from a batch. Furthermore, dropping a message will propagate an acknowledgement (also known as "ack") upstream to the pipeline's input.

Let me know if this makes sense. Otherwise, I'm happy to also chat on the Discord if it's easier 😄

@bssyousefi
Copy link
Author

bssyousefi commented Jul 25, 2024

Thanks @gregfurman for the explanation. so I guess by nacking in bento, we mean not acknowledging the proper receipt to the input source. Am I right?
As a matter of fact, I want bento to process the record just once even if there was some error. However, what I'm looking for is a mechanism in which bento doesn't ack in case of error and also avoid replaying the record, so I can have a list of records in redis streams NOT acknowledged and pending.
I tried to call reject in output in order to nack the record in case of error and to avoid repeating the record with error I tried to use mapping: root = deleted() before reject to prevent bento from adding the record to pendingMsgs. However, it didn't work as I wished. I guess the flow stops after dropping the record by mapping command and reject is not being called.
Is there any solution to avoid calling ack in case of error without repeating the record?

@gregfurman
Copy link
Collaborator

@bssyousefi Hey. Sorry for the bad correspondence here. Will re-look at this again tomorrow!

@gregfurman gregfurman self-assigned this Jan 5, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants