-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sampling backend implementation #709
Conversation
Are you planning to add network functionality to the sampler backend in this PR? |
There is no networking in the backend. Everything is piped from the service itself. |
} | ||
Some(sampling_message) = sampling_message_stream.next() => { | ||
Self::handle_sampling_message(sampling_message, &mut sampler).await; | ||
// cleanup not on time samples | ||
sampler.prune(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand, pruning now is done only after some interaction with the service. Could there be a situation in which the sampling was triggered way before the old_blobs_check_duration
, for some reason no other interactions happen with the service, and block producer requests for outdated validated blob (because pruning is done after the message is handled). Maybe we should have a ticker for pruning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A ticker is a good idea probably. Let's do this, lets delegate ticker creation to the backend (we need to add the method to the trait), because is the one that should know about timings, then in the service on tick we prune. Nice catch @bacv !
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that was the initial idea with the thread in the backend, which we then moved out to the service. A combination of the two sounds like the best approach. If I am understanding correctly, a ticker in the backend, but executing pruning from the service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The backend is just responsible of building the ticker (as it holds the proper configuration), and pruning. The service will call the prune when the ticker ticks, as it owns the main loop.
Specifically the backend need to return an Interval
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good! The tests helps to follow the flow.
@@ -23,6 +23,7 @@ tracing = "0.1" | |||
thiserror = "1.0.63" | |||
rand = "0.8.5" | |||
rand_chacha = "0.3.1" | |||
chrono = "0.4.38" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: this is not needed anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good overall.
println!("{}", self.settings.num_samples); | ||
println!("{}", ctx.subnets.len()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Leftover?
You can instrument the function if you want to debug it.
// TODO there is no logging at all here. Should we not do some logging? | ||
// Or can we assume that it is done on an upeer level by the clients? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can add whatever logging we need here. Then it is gonna be wrapped in the service context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added just 2 tracing::info
} | ||
} | ||
|
||
async fn next_prune_interval(&self) -> Interval { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpick: prune_interval
match ctx.subnets.len() { | ||
// sampling of this blob_id terminated successfully | ||
len if len == self.settings.num_samples as usize => { | ||
self.validated_blobs.insert(blob_id); | ||
// cleanup from pending samplings | ||
self.pending_sampling_blobs.remove(&blob_id); | ||
} | ||
len if len > self.settings.num_samples as usize => { | ||
unreachable!("{}", "more subnets than expected after sampling success!"); | ||
} | ||
// do nothing if smaller | ||
_ => {} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
match ctx.subnets.len() { | |
// sampling of this blob_id terminated successfully | |
len if len == self.settings.num_samples as usize => { | |
self.validated_blobs.insert(blob_id); | |
// cleanup from pending samplings | |
self.pending_sampling_blobs.remove(&blob_id); | |
} | |
len if len > self.settings.num_samples as usize => { | |
unreachable!("{}", "more subnets than expected after sampling success!"); | |
} | |
// do nothing if smaller | |
_ => {} | |
} | |
// sampling of this blob_id terminated successfully | |
if ctx.subnets.len() == self.settings.num_samples as usize { | |
self.validated_blobs.insert(blob_id); | |
// cleanup from pending samplings | |
self.pending_sampling_blobs.remove(&blob_id); | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not much of a fan of hiding away situations which should not happen but if they do, they are critical if not fatal...why would we do that? Yes, the code is nicer. Is there any place in the rest of the codebase which makes this error impossible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed privately. It is not possible it is more than the expected threshold. We increment 1 by 1 (ensured by &muy self
when we have enough the queue is empty and the blob is considered validated. Something else may happen. But not that.
// TODO: This also would be an error which should never happen, but what if a client starts | ||
// init_sampling of a blob which is already pending? Or worse, which already is validated? | ||
// Should we not therefore return an error here? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Imo this object should be idempotent. Why?, Because we cannot ensure that a single service will try to start sampling or check how things are going (even if for now it is the case). So, maybe here we could add a check and just start sampling if it wasn't pending already. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, if already sampling, this would result in the network adapter resending SampleRequest messages. Out of band you suggested an enum
return signaling the state, so that's what I did here.
// TODO: in most of these error cases we can't get the blob_id from the error | ||
// Shouldn't the error contain that? | ||
// We can of course stop tracking that blob_id in the backend via timeout, | ||
// which we want to have anyways, but could it be nicer to remove it here too, | ||
// by calling the handler_sampling_error method? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a specific error. We can try to get the blob_id from the error. It is has we sort circuit the sampling (calling the error handling method with the blobid). Otherwise we wait till time expires. It can be done here or in a different PR.
Steps would be:
- Create a
blob_id
method for the sampling error which returns anOption<BlobId>
- Call the method here and check if it is
Some(blob_id)
to call the error handling method. - Log it otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added in this PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a tiny refactor, I'm merging this after CI is 🟢
Good job! Thanks!
This PR adds the sampling backend implementation to the sampling backend service.