-
Notifications
You must be signed in to change notification settings - Fork 672
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Rewrite State Sync, from a giant state machine to proper async code. #12172
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #12172 +/- ##
==========================================
- Coverage 71.83% 71.66% -0.18%
==========================================
Files 827 834 +7
Lines 166639 166332 -307
Branches 166639 166332 -307
==========================================
- Hits 119713 119201 -512
- Misses 41709 41914 +205
Partials 5217 5217
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great!
store: Store, | ||
epoch_manager: Arc<dyn EpochManagerAdapter>, | ||
runtime: Arc<dyn RuntimeAdapter>, | ||
network_adapter: AsyncSender<PeerManagerMessageRequest, PeerManagerMessageResponse>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Why not use type PeerManagerAdapter
here like everywhere else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to PeerMananagerAdapter.
) | ||
.await?; | ||
let state_root = header.chunk_prev_state_root(); | ||
if runtime_adapter.validate_state_part( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove the validate_state_part
from runtime to just being a part of some util. It doesn't have any need to be in runtime.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is out of scope of this refactoring that is already large :)
chain/client/src/client.rs
Outdated
@@ -2579,7 +2576,7 @@ impl Client { | |||
sync_hash: CryptoHash, | |||
state_sync_info: &StateSyncInfo, | |||
me: &Option<AccountId>, | |||
) -> Result<HashMap<u64, ShardSyncDownload>, Error> { | |||
) -> Result<HashMap<u64, ShardSyncStatus>, Error> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShardSyncDownload
and ShardSyncDownloadView
are no longer used anywhere, can be deleted.
Nit: update the comment above
chain/client/src/client.rs
Outdated
@@ -2465,34 +2464,40 @@ impl Client { | |||
|
|||
for (sync_hash, state_sync_info) in self.chain.chain_store().iterate_state_sync_infos()? { | |||
assert_eq!(sync_hash, state_sync_info.epoch_tail_hash); | |||
let network_adapter = self.network_adapter.clone(); | |||
|
|||
let shards_to_split = self.get_shards_to_split(sync_hash, &state_sync_info, &me)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ugh, I think Alex merged in some change here. Will need to check if we need shards_to_split
at all.
network_adapter, | ||
self.runtime_adapter.store().clone(), | ||
self.epoch_manager.clone(), | ||
self.runtime_adapter.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super happy passing in the runtime into state sync. On the top level it doesn't seem like there should be any dependency between these two. Could we check if it's possible to decouple? Maybe as a follow up to this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can look into that after yeah.
return_if_cancelled!(cancel); | ||
|
||
// Finalize; this needs to be done by the Chain. | ||
*status.lock().unwrap() = ShardSyncStatus::StateApplyFinalizing; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice! With this setup, it should ideally be possible for us to convert ShardSyncStatus to a string instead? And we can keep adding more and more status with more information?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't want to touch that part because it was also used in a couple of other places. Maybe I can clean it up after.
/// would be blocked by the computation, thereby not allowing computation of other | ||
/// futures driven by the same driver to proceed. This function respawns the future | ||
/// onto the FutureSpawner, so the driver of the returned future would not be blocked. | ||
fn respawn_for_parallelism<T: Send + 'static>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of this, is it not possible to have small enough compute intensive tasks so as to not block the driver? This looks a bit odd overall where we are effectively "transferring" the future to a different spawner.awkward
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's basically what it does; the thing is that tokio_stream::iter never spawns anything; it only awaits multiple futures on the same driver.
chain/client/src/sync/state/mod.rs
Outdated
/// and then in `run` we process them. | ||
header_validation_queue: UnboundedReceiver<StateHeaderValidationRequest>, | ||
chain_finalization_queue: UnboundedReceiver<ChainFinalizationRequest>, | ||
chain_finalization_sender: UnboundedSender<ChainFinalizationRequest>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to store the sender as part of StateSync?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is removed; replaced with AsyncSender.
} | ||
Err(TryRecvError::Empty) => entry.get().status(), | ||
}, | ||
Entry::Vacant(entry) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we encapsulate this into a function like self.start_state_sync_for_shard
for better readability?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already the run_state_sync_for_shard function and the stuff here really isn't any useful logic other than just boilerplate code.
chain_finalization_sender: UnboundedSender<ChainFinalizationRequest>, | ||
|
||
/// There is one entry in this map for each shard that is being synced. | ||
shard_syncs: HashMap<(CryptoHash, ShardId), StateSyncShardHandle>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wait, where and how are we adding new entries into this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do a self.shard_syncs.entry(key) at the top, and if it's vacant we insert something into the entry.
chain/client/src/sync/state/mod.rs
Outdated
} | ||
|
||
/// Processes the requests that the state sync module needed the Chain for. | ||
fn process_chain_requests(&mut self, chain: &mut Chain) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly wary about having this setup for sync stuff happening...
While this is objectively better for our use case here, we have a pattern where we send an actix message to client to handle all things that should be in sync in client and this sorta breaks that... I can't think of better ways or alternatives
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to AsyncSender.
sync_status: shards_to_split.clone(), | ||
download_tasks: Vec::new(), | ||
computation_tasks: Vec::new(), | ||
}, | ||
BlocksCatchUpState::new(sync_hash, *epoch_id), | ||
) | ||
}); | ||
|
||
// For colour decorators to work, they need to printed directly. Otherwise the decorators get escaped, garble output and don't add colours. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: This comment is no longer relevant
@@ -2079,13 +2044,21 @@ impl ClientActorInner { | |||
|
|||
if block_hash == sync_hash { | |||
// The first block of the new epoch. | |||
if let Err(err) = self.client.chain.validate_block(&block) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently we see a lot of "Received an invalid block during state sync" spam during state sync because the node doesn't know how to validate blocks at the head of the chain. I think it makes sense to only validate the blocks that state sync is specifically looking for
part_id: *part_id, | ||
}, | ||
); | ||
let state_value = PendingPeerRequestValue { peer_id: None, sender }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the handling for NetworkRequests::StateRequestPart we select a specific peer from which to request the part, so it should be possible to store that and verify that the response comes back from the expected peer. However, it might be a bit ugly to pass the selected peer id back here from the network side of things, and I expect to redo how the state headers work soon, so I am fine with just leaving this as-is for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah.. this part is awkward indeed. I also don't know what to do about it right now.
@@ -1100,7 +1100,14 @@ impl PeerActor { | |||
.map(|response| PeerMessage::VersionedStateResponse(*response.0)), | |||
PeerMessage::VersionedStateResponse(info) => { | |||
//TODO: Route to state sync actor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems outdated
let downloader = Arc::new(StateSyncDownloader { | ||
clock, | ||
store: store.clone(), | ||
preferred_source: peer_source, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the moment we are in an ugly situation with state headers; a node needs to be tracking all shards to serve headers for any shard, nodes no longer track all shards, and the strategy for trying to obtain headers from the network is to request them from direct peers of the node at random. It is a remnant of the times when every peer tracked every shard, and it needs to be rewritten entirely.
Before this PR, if an external source was available we would just directly get the headers from it without any attempts to get it from the network. It looks like we are changing that now and will try the network first for headers. I think it should be OK; just giving a heads up that there is a behavior change hidden here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mean before the PR, even if we configure to download from peers, we would always fetch the header from external? Hmm. OK.
/// headers and parts in parallel for the requested shards, but externally, all that it exposes | ||
/// is a single `run` method that should be called periodically, returning that we're either | ||
/// done or still in progress, while updating the externally visible status. | ||
pub struct StateSync { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Btw, is the async part compatible with testloop?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is. There's nothing that is incompatible. It's removed anyway.
So @saketh-are @shreyan-gupta I addressed some of the issues and I'm just going to just merge this PR as it is right now, because the longer it takes the more merge conflicts I get. Then I'll follow up to clean up some of the stuff mentioned in the review comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving to unblock merging assuming @saketh-are and @shreyan-gupta already reviewed this
This rewrites state sync. All functionality is expected to continue to work without any protocol or database changes.
See the top of state/mod.rs for an overview.
State sync status is now available on the debug page; an example: