Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[draft; not-ready] receive: Sloppy quorum implementation #7106

Closed

Conversation

douglascamata
Copy link
Contributor

@douglascamata douglascamata commented Jan 30, 2024

  • I added CHANGELOG entry for this change.
  • Change is not relevant to the end user.

Changes

  • Adds a simple sloppy quorum implementation based on Receive: Implement a sloppy quorum #5809. The feature can be enabled with the CLI flag --receive.sloppy-quorum. The control of the amount of times the algorithm looks for a new node to replicate the request to is done using the --receive.sloppy-retries-limit flag. Currently the sloppy quorum logic happens in two places: when finding a peer connection and when writing, with independent retries counters.

One important detail is that there's no implementation of what's known as "hinted handoff". This means that writes that end up "slipping" will never be sent back to the original place where they should be. The reasons for this are:

  • Receive's sensitive to the metrics timestamp. It's not easy to write data that's not "from this instant in time" into a node. This requires enabling out-of-order features that are still experimental, adding a system to keep track of these, attempt the handoff periodically and (maybe?) delete this data from the temporary owning node's TSDB. It's a lot of complication and complexity.
  • Thanos' query path doesn't really care where the data is. If there's no requirement for such complicated system as mentioned above we are better off avoiding it.

Verification

@douglascamata douglascamata changed the title receive: Sloppy quorum implementation [draft; not-ready] receive: Sloppy quorum implementation Jan 30, 2024
Copy link
Member

@GiedriusS GiedriusS left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could add a small paragraph to the docs outlining how this works? It would make reviewing easier

@douglascamata
Copy link
Contributor Author

@GiedriusS sure thing! I will write when the implementation is ready. It'll take a while still.

So far I am experimenting with different approaches to see what's the simplest way to achieve sloppy quorum that I'm happy with and passes tests (current and new sloppy quorum related tests).

PR's open only to make it easier for me to ask some people's opinion on some things.

Signed-off-by: Douglas Camata <[email protected]>
When we reuse an endpoint for the same replica index we get out of order errors.

Signed-off-by: Douglas Camata <[email protected]>
@douglascamata
Copy link
Contributor Author

After investigating a bunch of different approaches to implement sloppy quorum in Thanos, I learned that the per-series replication makes things difficult.

Unfortunately I will pause my efforts towards this one more time. Let's see if the 3rd time will be the charm. I'm pushing my latest attempt to have it public. 😄

Below you can find a few things I tried and why they failed. Hopefully it can inspire and/or help someone else that want to work or collaborate on this.

Send the request as-is to another hashring member

When I try to send a request that failed to another member of the hashring, it is very likely to already have received part of the series from another replicated request.

Why is it so likely? Due to improvements in spread of data done to all hashring algorithms (ketama and hashmod).

Redistribute the remaining series between hashring members

Has the same problesm as described above.

Redistribute the series to a subslice of the original hashring

In this approach I used a denylist to create subslices of the original hashring where all the members that already received at least one request were ignored.

Very often this results in an empty subslice due to the great spread of data mentioned above.

When it doesn't, it has the same issues as the point above: the new node to get a given set of series very likely already has some series from that request.

Conclusion

Implementing sloppy quorum in Thanos will be a high effort work. It might require that we intercalate some small related features (i.e. allowing forward requests to be retried through configuration) and refactors to get to a point where sloppy quorum can be safely introduced with an implementation that is as simple as we can come up with.

@mfoldenyi
Copy link

@douglascamata Could you clarify something for me? You mentioned this:

When I try to send a request that failed to another member of the hashring, it is very likely to already have received part of the series from another replicated request.

Could you elaborate why this was a problem? AFAIK this should not be a problem at all.

Assume a situation with 3 receivers with replication factor 3. Any piece of data coming into the system would want to end up on all 3 of these receivers, so regardless which receiver it lands on initially, that receiver will send it off to the other 2, and it will write it down to its own tsdb locally.

If both the other receivers happen to be unavailable long enough to fail all internal retries, the quorum of 2 will not be achieved, and a 5xx response will be returned, which will cause an eventual retry from the client with the exact same data. (or possibly with new data added on top) If in the meantime at least one of the previously unavailable receivers come back up, the retry will be processed appropriately with a 200 response having reached the quorum of 2, despite the data already being (fully or partially) available in the receiver that it processed it on the first try.

This is standard operation which may or may not cause a warning to be printed in the logs.

What problems did you experience with this during your testing?

@douglascamata
Copy link
Contributor Author

@mfoldenyi it's been a few months since I worked on this, so let's see if I remember something about it. 🤣

The complication is that we are replicating series, not requests. Let's name these 3 receivers in the example A, B, and C -- to make it easier to understand. Consider also that only A and B are up and the replication factor is 3.

Receiver A was the first one to get a request with a few series. It does then the distribution for the replication.

Because we are hashing series, A does a local write. But it's not writing everything with replica 1. It writes replica 1 of some series, replica 2 of some series, replica 3 of some series.

Now, A sends the remote write to all the series that should go to B. As before, this remote write contains replica 1 for some series, 2 for some, 3 for some.

At the end, A has to decide what to do with the series that would be written on C. There comes the issue, I think: because the distribution is not perfect, there might be series left that didn't achieve the replication quorum of 2. It's not possible to guarantee that a request with 3 series will create a perfect replication over 3 nodes even if all the 3 were healthy.

The more nodes you have down in your hashring, even when you still have 2 up to achieve quorum, the more likely this is to happen. For instance, if you had 2 receivers up out of 4 total maybe a given series that for one success write would have its other 2 writes on the 2 nodes that are down.

I hope I'm remembering correctly, but I might be wrong.

@mfoldenyi
Copy link

Ok, so I may be understanding what you meant, so let me rephrase and tell me if I got it or not:

For now lets assume we have 4 receivers, A, B, C, D, replication factor 3.
A receives a request with series S1.
S1 maps to B, C, D

Assume B, C and D is unavailable. (in this state, no requests should ever succeed)
A does NOT write down S1, since it is not among the targets.
A sends S1 to B and fails. It tries to pick a non default target node (no point picking B, C or D as they are attempted anyway), which leaves itself (A). A writes down S1 locally and increments successful writes by 1.
A sends S1 to C and fails. Unaware of the replication attempt already writing to A, it also picks itself as the only potential fallback. It writes down S1 locally and increments successful writes by 1. (This will still work, AFAIK tsdb will simply not care and overwrite/ignore the data)
A sends S1 to D and fails. The same thing happens as for the other attempts

We end up getting 3 successful writes, when in reality all we did was just 1, the local write on A.

So above said, the problem you mean is that we cannot identify which writes are "real writes" that do count towards the quorum, multiple replication attempts could pick the same "fallback target" and report the same write as multiple successes?

If this is so, can we not just refactor the quorum check to collect target names instead of counts? We could then check uniqueness in the list to check for quorum.

@douglascamata
Copy link
Contributor Author

@mfoldenyi this scenario that you just showed is one of the few additional complications -- a very good one, I would say. If we are just thinking out loud, it's an easy problem to solve: you have to ensure that every single series is written to at least 2 different nodes. It might not be as easy in code, but definitely doable.

But let me draw your attention to the some other complications. You said this:

For now lets assume we have 4 receivers, A, B, C, D, replication factor 3.
A receives a request with series S1.
S1 maps to B, C, D

  1. This is an ideal scenario. S1 mapped exactly to 3 different receivers (B, C, and D). What if it had been mapped to B, C, and C? There's nothing to stop this from happening, even though the probability might be low. With enough series in a request, the probability raises. At what point would it become truly significant? I don't know. We need some extra logic to avoid overlaps. It would then solve the issue you just mentioned of everything ending up in the receiver A.

  2. What if S1-R1 (series 1, replication number 1) gets written in B, then S1-R2 in C, but S1-R3 fails to write to D. Then by chance S1-R3 gets remapped to B or C: it'll fail to write by definition, because the series' already there. It would only write successfully on A. It doesn't make sense to waste time trying to write it to B or C, we should send it to A directly. So lots of logic is needed to make per series decisions.

  3. Now extrapolate a bit: a request with tens of thousands of series, going into a group of tens of receivers. Because there are many series, statistically speaking a single receiver will replicate series to the whole cluster. Also statistically speaking, with enough series and nodes we might have a nearly perfect distribution. If one request fails, what do you do with it? All the other nodes in the cluster will contain at least one of the series in the HTTP request that failed, so sending the request to the next node in the hashring will fail for some series. Sounds easy to think about this: just take what failed to write, remap them, and try a few times (that's why I had the sloppy retry parameter at some point). But what if by chance 2 receivers are down and you lose quorum of a given series? What's the response of the remote write request: a partial write or an error because quorum wasn't achieved for a single series?

There are lots of decisions and tradeoffs to be made. The code starts to become complicated and things get difficult. There's a lot of work to do and many of these details only come up as you start to see tests behaving weirdly.

@mfoldenyi
Copy link

So there is one aspect which I have assumed to be one way, and you are saying otherwise, which I so far believe to not be true. Specifically this part:

What if S1-R1 (series 1, replication number 1) gets written in B, then S1-R2 in C, but S1-R3 fails to write to D. Then by chance S1-R3 gets remapped to B or C: it'll fail to write by definition, because the series' already there.

My understanding so far was, that this is not a problem. S1-R3 would successfully write to both B and C if attempted, despite them already having the samples. If this was not so, we would pretty almost always end up in a "not possible to recover from" scenario whenever we are processing a retry:

Take a request R1 with 2 series:
S1 successfully replicates to all the mapped nodes
S2 fails the quorum
->
R1 is failed with a 5xx error due to a failing quorum on S2, but the client does not get this information. To them the entire R1 is rejected.

R1 gets resubmitted by the client:
S1 successfully replicates to all the mapped nodes (which are the same ones as before, and they each already had the data, but you say this is by definition rejected)
S2 successfully replicates to all the mapped nodes
->
R1 gets accepted as a whole, a 200 is returned

If resending the same data to the same node was a problem, then this use case would not work, more over, any request containing S1 arriving in the future would by default be rejected, which most of the time would completely kneecap all of the retries arrive, since in my experience most of the time requests are rejected due to only a small subset of the series within not reaching quorum. (eg 2 nodes of 90 are down, 90% of requests get rejected, but retries work fine right after with 90 nodes)

Am I not seeing something that makes this situation somehow different to what you are talking about?

If this is indeed a problem, we could still limit the solution the AZ aware ketama hashring, where we would have the ability to find nodes that are definitely not normal targets of the data (since at max 1 request would land in one AZ, so all other nodes in the AZ are good fallback candidates)

@douglascamata
Copy link
Contributor Author

If resending the same data to the same node was a problem

Resending the same series, with the same timestamp, and the same value to the same node will 100% fail.

This happens today when a request is partially written. Often it will result in Prometheus retrying the request non-stop because there will be always an error as part of it was already written. It only stops when Prometheus decides to drop the request from its retry queue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants