Fair sharding #2798
Replies: 2 comments 2 replies
-
Hi, there are some upvotes here, but still no discussion. The thing seems highly doable, and I already have a working implementation of the algorithm as an outside service to serve as reference. If this is something we want to pursue, we could maybe start a architecture decision document ? |
Beta Was this translation helpful? Give feedback.
-
We have been doing quite a bit of thinking and debating over topics like this internally. Should have something to share fairly soon. |
Beta Was this translation helpful? Give feedback.
-
Problem Overview
There is currently an ADR in progress about Ordered Consumer Groups. The current proposal enables consuming messages in parallel as long as messages sharing a common subject or header are processed sequentially on the same consumer.
My experience with this (14 years on the topic as an architect at D-Edge/Accor) shows that this solution is acceptable as long as traffic relatively homogenous between shards/partition keys. But it's problematic as soon as some shards can randomly be published in burst, especially in the presence of slow consumers (when consumers are fast, things are too easy). When a burst occurs on a shard key, messages published for other shard keys after the burst can be impacted and delayed until all messages from the burst are processed.
This is typical with static consistent hashing (here an example with 3 partitions, allocated using a mod3):
partitions 1 and 2 will quickly be empty, but the Message M3 for partition key 3, is blocked behind all messages for partition key 0. It would be fair to process M3 just after the first M0 message, as it would be compatible with sequentiallity by partition key, and ordering in a partition key.
Current solution
I implemented a fair sharding strategy on MSMQ that has been in use with success for several years, and managed to port it to Nats/Jets Stream.
Messages are persisted to a stream configured with Work Queue retention.
A dispatcher service starts an ephemeral consumer in HeadersOnly mode, with an infinite AckWait.
The dispatcher maintains a queue map, initially empty, associating an in-memory queue of message sequence numbers (and message ack subject) to a partition key. It also maintains a queue of partition keys, initially empty too, as a ring buffer.
When receiving a message, the dispatcher extract the partition key (it can be a part of the subject, or the value of a header) from the message and checks whether there is already a queue for the partition key in the queue map.
The dispatcher will use these in memory structures to dispatch the work to workers (subscribers). When workers start, they signal themselves over a known subject, indicating a worker specific subject to be notified of work to do. The dispatcher enrolls the worker in a list. If the dispatcher is starting after the workers, it signal itself on a known subject, and workers respond with their specific subject.
When a message arrived, when a workers is enrolled, or when a message has just been processed, the dispatcher tries to allocate a message to a worker. It first check if there is any available worker (enrolled and not already busy). The it pops the first partition key from the ring buffer queue. It then find the in memory queue in the queue map and pops the first message from the queue. It then sends the sequence number and the ack subject to the worker.
The worker receive the information reply with an ack (to the dispatcher) to indicate that it received it and is ready to start work. The worker emit a GET request using the stream name and the sequence number to retrieve the message body. It then process the message. Once done, it sends a +ACK on the transmitted ack subject to remove the message for JS work queue.
When sending a message to the worker, the dispatcher starts to also listen on the message ack subject to be notified of the ack. Once acked, the dispatcher can check whether there are other messages in the in-memory queue. If true, it re-enqueues the partition key in the ring buffer. If not, it removes the in-memory queue from the queue map, and doesn't re-enqueue the partition key in the ring buffer.
Since partition key are dequeued from the ring buffer while work for this partition key is in progress, it ensures that messages for this partition key cannot be processed concurrently. For a single partition key, messages are dequeued from the in-memory queue, so they are processed in order. Once work has been done for a partition key, it is re-queued at the end of the ring buffer. It will have to wait for all other partition keys to be served before being served again.
the preceding sample, the dispatcher will contain 5 (0..4) in-memory queues, one for each partition key. The ring buffer will contain one instance of each partition key. It will first get the 0 partition key, dequeue the first M0 message and send it to a worker. If another worker is available, it will get the 1 partition key and and dequeue the M1 message. When M0 is processed, there is more work to do, and partition key 0 is re-queued to the end of the ring buffer. The next messages will be for partition key 2 and 3 and 4, and only after, 0 will be considered again. As there are no more messages for 1,2,3,4 the queues are removed from the queue map freeing some resources. The dispatcher will continue to distribute M0 messages sequentially.
Implementation in JetStream / nats-server
Implementing this sharding strategy inside JetStream would have several advantages.
Reduce message
The current implementation involves a lot of messages:
* Pushing message from JS to dispatcher
* Pushing sequence number from dispatcher to Worker
* Asking to JS messages body (request response)
* Sending the ack from Worker to JS and dispatcher
An implementation inside nats-server would directly send message with the body from JS to worker. And the worker would just ack the message as expected.
Reduce memory usage
The current implementation must keep track of messages in the service. All this information is already in jetstream. JetStream would just need to have in-memory queues of messages sequence numbers.
Make it available for all clients
The current implementation has been developed for .Net. Using it in Node, Php, or Java requires to rewrite all the logic in these languages.
With an implementation in jets stream, Client just have to indicate they're available as a worker for a stream, and wait for messages from JS with the work to do, and ack it when done.
Beta Was this translation helpful? Give feedback.
All reactions