Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple producers in redis streams #2581

Open
wants to merge 11 commits into
base: master
Choose a base branch
from

Conversation

ganeshvanahalli
Copy link
Contributor

This PR allows our redis stream pub-sub implementation to support multiple producers per stream.

Motivation for this change is that previously only one producer was supported per stream I.e although two producers were “technically” allowed to connect to a same redis stream, they couldn’t take any meaningful use of it i.e if both producers requested response for a same request, the consumers would treat these requests as unique and solve the same request twice thus leading to wastage of resources.

Previously each consumer used to maintain a heartbeat key in redis and set it periodically to confirm being active, these keys were used by producer to determine what messages are stuck in PEL (pending entry list- messages that are claimed by a consumer but not yet acknowledged) and reinsert them into stream. This PR simplifies this whole process by using XAUTOCLAIM that automatically lets any consumer claim a message which belongs to PEL and is idle for a minimum amount of time.

Testing Done

Various test cases covering different scenarios have been added to test the working of this design

Resolves NIT-2685

@cla-bot cla-bot bot added the s Automatically added by the CLA bot if the creator of a PR is registered as having signed the CLA. label Aug 15, 2024
@ganeshvanahalli ganeshvanahalli marked this pull request as ready for review August 15, 2024 12:49
eljobe
eljobe previously approved these changes Aug 20, 2024
pubsub/consumer.go Outdated Show resolved Hide resolved
Copy link
Contributor

@magicxyyz magicxyyz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR! Added few comments for improvements, some error handling and to the stream trimming (it might need some more complex solution)

pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/producer.go Outdated Show resolved Hide resolved
validator/validation_entry.go Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
pubsub/consumer.go Outdated Show resolved Hide resolved
eljobe
eljobe previously approved these changes Aug 29, 2024
Copy link
Member

@eljobe eljobe left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Consumer: c.id,
MinIdle: c.cfg.IdletimeToAutoclaim, // Minimum idle time for messages to claim (in milliseconds)
Stream: c.redisStream,
Start: decrementMsgIdByOne(pendingMsgs[idx].ID),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to decrement the ID?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tested this locally and can confirm that we definitely need to decrement the start by 1, because for some reason XAUTOCLAIM is non-inclusive of start, this is weird and also contrary to their docs https://redis.io/docs/latest/commands/xautoclaim/

Internally, the command begins scanning the consumer group's Pending Entries List (PEL) from and filters out entries having an idle time less than or equal to min-idle-time.

}).Result()
if err != nil {
log.Error("error from xautoclaim", "err", err)
}
Copy link
Contributor

@magicxyyz magicxyyz Aug 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would be good to:

  • either add a comment saying that if we fail to auto claim then we don't retry and just try to consume a new message
    (eg. other client(s) front-run us to msgs within (scan start PEL index, scan start PEL index + 10] and it is possible that there are other timed out msgs)
    This option might not be optimal as in conjunction with random choice of timed out Pending Entires there is a risk of starving one of the timed out entries. This starving to happen requires quite frequent consumer failures (in comparison to time required to process single msg) so as XPENDING would need to return always more then one entry. That might be fine for the redis validation use case, but might bite us in future when pubsub is used for some other component.

  • or we can check here if len(messages) == 0 and exit early with some defined error (something like EAGAIN) that we can check on the Consume caller side. That way the caller could retry after some delay - the same as when there are no new messages

    s.StopWaiter.CallIteratively(func(ctx context.Context) time.Duration {
    req, ackNotifier, err := c.Consume(ctx)
    if err != nil {
    log.Error("Consuming request", "error", err)
    return 0
    }
    if req == nil {
    // There's nothing in the queue.
    return time.Second
    }

    This option gives us guarantee that retries will be handled first and the new messages will be consumed only after all failed are retried.

Copy link
Contributor Author

@ganeshvanahalli ganeshvanahalli Sep 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am FOR option 1 instead of 2 because option 2 would be similar to trying to xautoclaim on every message from xpending, the reason we went with random pick is to have a balance between choosing idle pel entries and new messages with first preference given to idle pel entries.

retrying on idle pel entries again and again until success or xpending is exhausted might lead to unintended consequence of starving new messages. length of idle pel entries should anyway be less in number and shouldn't regularly increase - which again will only lead to starvation if rate of addition of entries to this idle pel entries is greater than rate of removal (which is equal to N/[avg processing time] where N is number of workers)

if !errors.Is(err, redis.Nil) {
log.Error("Error from XpendingExt in getting PEL for auto claim", "err", err, "penindlen", len(pendingMsgs))
}
} else if len(pendingMsgs) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe for a future PR, but we might want to filter pendingMsgs by pendingMsg.RetryCount - log error and skip if we found a message that was retried too many times - that would prevent a situation when the stream throughput is degraded because of some specific messages constantly triggering consumer crashes.

trimmed, trimErr = p.client.XTrimMinID(ctx, p.redisStream, minId).Result()
} else {
trimmed, trimErr = p.client.XTrimMaxLen(ctx, p.redisStream, 0).Result()
// XDEL on consumer side already deletes acked messages (mark as deleted) but doesnt claim the memory back, XTRIM helps in claiming this memory in normal conditions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depending on how much we prioritize freeing memory early vs lower computation required,
but I think that XTRIM might not be needed here - we could save on some additional complexity here and processing on redis server side.

as per the XDEL docs:

Eventually if all the entries in a macro-node are marked as deleted, the whole node is destroyed and the memory reclaimed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should prioritize freeing memory early even though implications of XDEL would eventually do it any way sometime after all the elements in a macro-node are marked as deleted mainly because- if there is a scenario where one/few consumers fail to delete their ack-ed entries, this will lead to growing of memory indefinitely as ack-ed entries don't show up in PEL and there's really now for us to detect this other than seeing the logs on consumer side.
Besides I think having a deletion mechanism on both producer and consumer side is a bit safer, I do agree with your point that we don't have to do xtrim as regularly as CheckResultInterval, maybe at a 5*CheckResultInterval duration sounds reasonable?

pubsub/consumer.go Show resolved Hide resolved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
s Automatically added by the CLA bot if the creator of a PR is registered as having signed the CLA.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants