-
Notifications
You must be signed in to change notification settings - Fork 248
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
opt: optimize cluster identification #3309
base: main
Are you sure you want to change the base?
opt: optimize cluster identification #3309
Conversation
@nazar-pc additionally, during my actual development, this part of the code is not easy to test, and some scenarios are hard to cover(like farm FingerprintUpdated). At the very least, I need to start 3 components: nats, controller, and farmer/cache to do so. I was thinking maybe I could first submit a PR to extract the update logic for caches and farms and cover enough test cases? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for contribution and sorry it took this long to read into it. This is certainly the right direction, but it will cause issues for the way maintenance of caches and farms is done (prevent loop with select!
from actually looping quickly, which was carefully avoided before).
I only left comments on cache, but similar comments apply to farmer side as well.
I also don't fully understand why the thing that we try to address here was sort of added back at the end, I'm confused.
And please rebase changes after further updates (if any) and squash changes to the same part of the codebase, it'll be easier to review that way.
pub struct ClusterCacheDetailsRequest; | ||
|
||
impl GenericStreamRequest for ClusterCacheDetailsRequest { | ||
const SUBJECT: &'static str = "subspace.cache.*.details"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment just like in other places
const SUBJECT: &'static str = "subspace.cache.*.details"; | |
/// `*` here stands for cache ID | |
const SUBJECT: &'static str = "subspace.cache.*.details"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the subject's instance for farmer's request could either be a cluster farmer ID or a single farm ID, I have added comments to the farmer's subject as well.
pub enum CacheId { | ||
/// Cache ID | ||
Ulid(Ulid), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't have to use Ulid
(though it doesn't hurt, just more verbose implementing encoding/decoding) and I'd call it ClusterCacheInstance
(and similarly for farmer), in which case there is no need to rename things in many places and will prevent confusion between PieceCacheId
and CacheId
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to confirm, do you mean something like using first_cache_id.to_string() as the ClusterCacheInstance? Or perhaps ClusterCacheInstance(CacheId)?
known_caches.update_cache( | ||
cache_id, | ||
max_num_elements, | ||
} = identify_message; | ||
if known_caches.update(cache_id, max_num_elements, nats_client) { | ||
info!( | ||
%cache_id, | ||
"New cache discovered, scheduling reinitialization" | ||
); | ||
scheduled_reinitialization_for.replace( | ||
Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, | ||
); | ||
} else { | ||
trace!( | ||
%cache_id, | ||
"Received identification for already known cache" | ||
); | ||
} | ||
&mut scheduled_reinitialization_for, | ||
nats_client, | ||
async { | ||
nats_client | ||
.stream_request( | ||
&ClusterCacheDetailsRequest, | ||
Some(&cache_id.to_string()), | ||
) | ||
.await | ||
.inspect_err(|error| warn!( | ||
%error, | ||
%cache_id, | ||
"Failed to request farmer farm details" | ||
)) | ||
.ok() | ||
}, | ||
).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are a couple of issues with this:
- You can't
.await
here because it'll prevent other branches ofselect!
from making progress in the meantime, which will likely make other caches to disconnect, causing further issues - It is a confusing API creating a future here that is actually awaited inside with more work done internally
KnownCaches
is designed as a simple state machine, it shouldn't do any async work, only update its state based on inputs.
What should happen is a background task like farms_to_add_remove
+ farm_add_remove_in_progress
that manage sequential addition of farms (it doesn't actually need to be globally sequentially, just for individual farms, but it was easier to implement that way, might be a good improvement to parallelize though).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I hadn’t paid much attention to this part of the details before. Yes, this would cause the select
to be blocked. I think I need to adjust the logic here carefully (including for the farmer). Also, do you think I should add an extra flag to indicate that this ClusterCache
(or ClusterFarmer
) is currently being updated to prevent duplicate update tasks from being added? Based on the current identification interval, such a situation shouldn’t occur, but I’d like your input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Farmer right now doesn't need any flags because it queues things and processes them sequentially. As mentioned that is not the most efficient, but one of the simplest ways to implement it. I think it'll be fine to do the same here.
On farmer side since we already have such infrastructure we might be able to reuse it and parallelize processing of multiple farms belonging to the same farmer (since now add/remove unit is farmer, not farm).
My suggestion would be to not change cache and farmer at the same time. Start with cache, once done and you have an experience with how that works, proceed with farmer, which is a bit more involved, but conceptually similar.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, let me adjust the code. Once I've completed my modifications, I'll take care of those annoying merges.
known_caches.update_cache( | ||
cache_id, | ||
max_num_elements, | ||
} = identify_message; | ||
if known_caches.update(cache_id, max_num_elements, nats_client) { | ||
info!( | ||
%cache_id, | ||
"New cache discovered, scheduling reinitialization" | ||
); | ||
scheduled_reinitialization_for.replace( | ||
Instant::now() + SCHEDULE_REINITIALIZATION_DELAY, | ||
); | ||
} else { | ||
trace!( | ||
%cache_id, | ||
"Received identification for already known cache" | ||
); | ||
} | ||
&mut scheduled_reinitialization_for, | ||
nats_client, | ||
async { | ||
nats_client | ||
.stream_request( | ||
&ClusterCacheDetailsRequest, | ||
Some(&cache_id.to_string()), | ||
) | ||
.await | ||
.inspect_err(|error| warn!( | ||
%error, | ||
%cache_id, | ||
"Failed to request farmer farm details" | ||
)) | ||
.ok() | ||
}, | ||
).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I hadn’t paid much attention to this part of the details before. Yes, this would cause the select
to be blocked. I think I need to adjust the logic here carefully (including for the farmer). Also, do you think I should add an extra flag to indicate that this ClusterCache
(or ClusterFarmer
) is currently being updated to prevent duplicate update tasks from being added? Based on the current identification interval, such a situation shouldn’t occur, but I’d like your input.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good to me, once Nazar's comments have been addressed
b633d33
to
beb798c
Compare
No changes were made, just squash changes through rebase. Additionally, I removed ClusterCacheIdentifyPieceCacheBroadcast (also for the Farmer), they were reintroduced in a separate commit, and I simply dropped that commit. Nazar's comments will be addressed in subsequent commits. |
d368b04
to
4246b58
Compare
4246b58
to
cda20e5
Compare
I’ve rearranged the commit order to make squashing easier later. @nazar-pc I finished the cache implementation (it’s relatively straightforward), so you can review it for any potential issues. 91b3dd4: When a new cache appears, the system will collect the stream in the background and update KnownCaches once it’s done. Before making changes to the farmer, perhaps I could submit a separate PR to parallelly add or remove farms? It doesn’t look too complex right now (and may even simplify the implementation). |
cda20e5
to
75b0755
Compare
Second attempt to close #2900
The first commit is meaningless; it simply renames cache_id to piece_cache_id
For the cache, everything is straightforward; it's just a matter of recording the corresponding relationships in the controller. However, things are a litter bit more complicated for the farmer. First, we check the identify message to see if the farmer is newly discovered and whether the fingerprint has changed. Based on the results, we decide whether to use the stream to fetch details.
The final commit is to ensure compatibility with previous approaches.
(I’m really sorry, actually, I finished it a long time ago, but I forgot about it and left it in a corner.)
Code contributor checklist: