Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

archive: Refactor archive_storage method into subscription #6483

Merged
merged 81 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
81 commits
Select commit Hold shift + click to select a range
e17a555
archive/api: Add stroage diff method
lexnv Oct 9, 2024
4101ee6
storage/events: Add storage events and test serialization
lexnv Oct 9, 2024
d781d67
archive: Add initial implementation for storage diff
lexnv Oct 9, 2024
60e1a93
archive: Handle modified and delete cases
lexnv Oct 9, 2024
63da031
archive/events: Derive more impl for diff items
lexnv Oct 9, 2024
25b9e5a
archive: Deduplicate input keys
lexnv Oct 9, 2024
52cfaf7
archive: Move code to different module
lexnv Oct 9, 2024
ee6bed7
archive: Fetch value / hash or both
lexnv Oct 9, 2024
8948e74
events: Add subscription events for ArchiveStorageDiffEvent
lexnv Nov 4, 2024
fbfb0dc
archive: Make archive_storageDiff a subscription
lexnv Nov 4, 2024
f7b33ed
archive: Modify storageDiff to leverage subscription backpressure
lexnv Nov 4, 2024
f47db36
archive: Deduplicate items to a separate function
lexnv Nov 5, 2024
97546a4
archive/tests: Check deduplication
lexnv Nov 5, 2024
baa3dd0
archive: Fix deduplication shortest key
lexnv Nov 5, 2024
4683f6b
archive/tests: Check complex deduplication
lexnv Nov 5, 2024
36f59d2
archive: Simplify logic of trie iteration
lexnv Nov 5, 2024
a084f47
archive: Improve documentation
lexnv Nov 5, 2024
77cac50
archive/events: Add derive(Eq) to archive storage diff events
lexnv Nov 5, 2024
e2324a4
archive/tests: Check query for main trie under prefix
lexnv Nov 5, 2024
4b05913
archive: Add trace and debug logs for storage diff
lexnv Nov 5, 2024
83cb9b4
archive: Send StorageDiffDone for partial trie queries
lexnv Nov 5, 2024
85c810e
archive/tests: Check no changes between blocks
lexnv Nov 5, 2024
152d13b
archive/tests: Ensure added key
lexnv Nov 5, 2024
4f549bf
archive/tests: Extend test with interleaved values and hashes
lexnv Nov 5, 2024
d208068
rpc-v2: Use indexmap crate
lexnv Nov 5, 2024
369a4d2
archive: Move prevHash as last parameter to archive diff
lexnv Nov 5, 2024
a02cc84
archive: Preserve order of elements with indexMap
lexnv Nov 5, 2024
637b446
archive/tests: Check deleted keys
lexnv Nov 5, 2024
76efb18
events: Add common wrappers for construction
lexnv Nov 5, 2024
ca1fa20
archive: Propagate errors via events
lexnv Nov 5, 2024
03eefd7
archive: Check invalid parameters and events
lexnv Nov 5, 2024
9a1e792
archive: Fix clippy
lexnv Nov 5, 2024
dbbfce1
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 5, 2024
971026d
Add prdoc
lexnv Nov 5, 2024
2880058
Merge branch 'master' into lexnv/storage-diff
lexnv Nov 6, 2024
b774855
archive/storage: Optimize space used for saved keys
lexnv Nov 6, 2024
838504b
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv Nov 6, 2024
60abd9b
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 6, 2024
dc5f001
archive: Propagate all queries to handle_trie_queries
lexnv Nov 11, 2024
cdc4dc5
archive: Simplify process_events
lexnv Nov 11, 2024
de7cf79
archive: Add missing line
lexnv Nov 11, 2024
199b46d
archive: Refactor with FetchedStorage enum
lexnv Nov 11, 2024
3f4a4b8
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 11, 2024
f4ace06
archive: Refactor for optimal iteration cycles
lexnv Nov 11, 2024
55489c7
archive: Adjust code comments
lexnv Nov 12, 2024
4f94890
archive: Rename starts_with to belongs_to_query
lexnv Nov 12, 2024
a7e54a5
Update prdoc
lexnv Nov 12, 2024
5341227
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 12, 2024
25728a1
common/events: Transform StorageResult into StorageEvent
lexnv Nov 14, 2024
8860e7e
archive/api: Make archive_storage a subscription
lexnv Nov 14, 2024
533ecb9
rpc-v2: Rename ArchiveResult into ArchiveEvent
lexnv Nov 14, 2024
0849e8e
events: Add wrappers for ArchiveStorageEvent
lexnv Nov 14, 2024
a2f92b6
common/storage: Introduce a common storage subcription handler
lexnv Nov 14, 2024
7fec81a
archive/api: Remove pagination items from archive storage
lexnv Nov 14, 2024
b6523d2
common/events: Add constructor methods for clarity
lexnv Nov 14, 2024
bb65583
archive: Remove ArchiveStorage in favour of common archive object
lexnv Nov 14, 2024
1917672
archive: Remove config for storage parameters and use backpressure
lexnv Nov 14, 2024
702f521
archive/storage: Adjust testing to subscription based approach
lexnv Nov 14, 2024
751d8ff
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 26, 2024
3c2bb7c
archive: Remove variable
lexnv Nov 26, 2024
8f8c0c0
Update cargo lock
lexnv Nov 26, 2024
07a459c
archive: Mode deduplicate_storage_diff_items to inenr methods
lexnv Nov 26, 2024
a151bf3
archive/tests: Ensure other storage entries are not provided
lexnv Nov 26, 2024
a575b4a
archive/diff: Implement lexicographic_diff
lexnv Nov 27, 2024
caa75cc
archive/tests: Check lexicographic diff
lexnv Nov 27, 2024
7ff0bdb
Merge remote-tracking branch 'origin/master' into lexnv/storage-diff
lexnv Nov 27, 2024
c6e40a0
archive: Remove commented code and add documentation
lexnv Nov 27, 2024
a07b5c4
Update substrate/client/rpc-spec-v2/src/archive/api.rs
lexnv Nov 27, 2024
ae00327
archive: Remove unneeded lifetime
lexnv Nov 27, 2024
fd48434
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv Nov 27, 2024
736a5ad
Merge remote-tracking branch 'origin/lexnv/storage-diff' into lexnv/s…
lexnv Nov 27, 2024
791831f
archive: Check sink closed
lexnv Nov 27, 2024
3ddaf2d
archive: Refactor process methods
lexnv Nov 27, 2024
774b525
archive/tests: Adjust to new assumptions wrt StorageDone
lexnv Nov 27, 2024
4147442
archive: Rename StorageResult to Storage
lexnv Nov 27, 2024
c0f5ded
storage/events: Include child trie key
lexnv Nov 27, 2024
39aab7a
archive/tests: Adjust testing
lexnv Nov 27, 2024
b14a4bd
archive/tests: Check childtrie results
lexnv Nov 27, 2024
9676499
Merge remote-tracking branch 'origin/master' into lexnv/storage-sub
lexnv Nov 28, 2024
ff09c7e
archive/tests: Fix wrong merge
lexnv Nov 28, 2024
087f0e7
archive/events: Align with spec
lexnv Nov 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions substrate/client/rpc-spec-v2/src/archive/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@

use crate::{
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
},
MethodResult,
};
Expand Down Expand Up @@ -100,13 +99,17 @@ pub trait ArchiveApi<Hash> {
/// # Unstable
///
/// This method is unstable and subject to change in the future.
#[method(name = "archive_unstable_storage", blocking)]
#[subscription(
name = "archive_unstable_storage" => "archive_unstable_storageEvent",
unsubscribe = "archive_unstable_stopStorage",
item = ArchiveStorageEvent,
)]
fn archive_unstable_storage(
&self,
hash: Hash,
items: Vec<PaginatedStorageQuery<String>>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult>;
);

/// Returns the storage difference between two blocks.
///
Expand Down
202 changes: 107 additions & 95 deletions substrate/client/rpc-spec-v2/src/archive/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@

use crate::{
archive::{
archive_storage::{ArchiveStorage, ArchiveStorageDiff},
error::Error as ArchiveError,
ArchiveApiServer,
archive_storage::ArchiveStorageDiff, error::Error as ArchiveError, ArchiveApiServer,
},
common::events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageResult,
PaginatedStorageQuery,
common::{
events::{
ArchiveStorageDiffEvent, ArchiveStorageDiffItem, ArchiveStorageEvent, StorageQuery,
},
storage::{QueryResult, StorageSubscriptionClient},
},
hex_string, MethodResult, SubscriptionTaskExecutor,
};
Expand Down Expand Up @@ -57,42 +57,12 @@ use tokio::sync::mpsc;

pub(crate) const LOG_TARGET: &str = "rpc-spec-v2::archive";

/// The configuration of [`Archive`].
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woo 🥇

pub struct ArchiveConfig {
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
pub max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time.
pub max_queried_items: usize,
}

/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
///
/// Note: this is identical to the `chainHead` value.
const MAX_DESCENDANT_RESPONSES: usize = 5;

/// The maximum number of queried items allowed for the `archive_storage` at a time.
///
/// Note: A queried item can also be a descendant query which can return up to
/// `MAX_DESCENDANT_RESPONSES`.
const MAX_QUERIED_ITEMS: usize = 8;

/// The buffer capacity for each storage query.
///
/// This is small because the underlying JSON-RPC server has
/// its down buffer capacity per connection as well.
const STORAGE_QUERY_BUF: usize = 16;

impl Default for ArchiveConfig {
fn default() -> Self {
Self {
max_descendant_responses: MAX_DESCENDANT_RESPONSES,
max_queried_items: MAX_QUERIED_ITEMS,
}
}
}

/// An API for archive RPC calls.
pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
/// Substrate client.
Expand All @@ -103,11 +73,6 @@ pub struct Archive<BE: Backend<Block>, Block: BlockT, Client> {
executor: SubscriptionTaskExecutor,
/// The hexadecimal encoded hash of the genesis block.
genesis_hash: String,
/// The maximum number of items the `archive_storage` can return for a descendant query before
/// pagination is required.
storage_max_descendant_responses: usize,
/// The maximum number of queried items allowed for the `archive_storage` at a time.
storage_max_queried_items: usize,
/// Phantom member to pin the block type.
_phantom: PhantomData<Block>,
}
Expand All @@ -119,18 +84,9 @@ impl<BE: Backend<Block>, Block: BlockT, Client> Archive<BE, Block, Client> {
backend: Arc<BE>,
genesis_hash: GenesisHash,
executor: SubscriptionTaskExecutor,
config: ArchiveConfig,
) -> Self {
let genesis_hash = hex_string(&genesis_hash.as_ref());
Self {
client,
backend,
executor,
genesis_hash,
storage_max_descendant_responses: config.max_descendant_responses,
storage_max_queried_items: config.max_queried_items,
_phantom: PhantomData,
}
Self { client, backend, executor, genesis_hash, _phantom: PhantomData }
}
}

Expand Down Expand Up @@ -260,47 +216,53 @@ where

fn archive_unstable_storage(
&self,
pending: PendingSubscriptionSink,
hash: Block::Hash,
items: Vec<PaginatedStorageQuery<String>>,
items: Vec<StorageQuery<String>>,
child_trie: Option<String>,
) -> RpcResult<ArchiveStorageResult> {
let items = items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
let pagination_start_key = query
.pagination_start_key
.map(|key| parse_hex_param(key).map(|key| StorageKey(key)))
.transpose()?;

// Paginated start key is only supported
if pagination_start_key.is_some() && !query.query_type.is_descendant_query() {
return Err(ArchiveError::InvalidParam(
"Pagination start key is only supported for descendants queries"
.to_string(),
))
}
) {
let mut storage_client =
StorageSubscriptionClient::<Client, Block, BE>::new(self.client.clone());

let fut = async move {
let Ok(mut sink) = pending.accept().await.map(Subscription::from) else { return };

Ok(PaginatedStorageQuery {
key,
query_type: query.query_type,
pagination_start_key,
let items = match items
.into_iter()
.map(|query| {
let key = StorageKey(parse_hex_param(query.key)?);
Ok(StorageQuery { key, query_type: query.query_type })
})
})
.collect::<Result<Vec<_>, ArchiveError>>()?;
.collect::<Result<Vec<_>, ArchiveError>>()
{
Ok(items) => items,
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
return
},
};

let child_trie = child_trie
.map(|child_trie| parse_hex_param(child_trie))
.transpose()?
.map(ChildInfo::new_default_from_vec);
let child_trie = child_trie.map(|child_trie| parse_hex_param(child_trie)).transpose();
let child_trie = match child_trie {
Ok(child_trie) => child_trie.map(ChildInfo::new_default_from_vec),
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error.to_string()));
return
},
};

let storage_client = ArchiveStorage::new(
self.client.clone(),
self.storage_max_descendant_responses,
self.storage_max_queried_items,
);
let (tx, mut rx) = tokio::sync::mpsc::channel(STORAGE_QUERY_BUF);
let storage_fut = storage_client.generate_events(hash, items, child_trie, tx);

Ok(storage_client.handle_query(hash, items, child_trie))
// We don't care about the return value of this join:
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_storage_events(&mut rx, &mut sink))
.await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}

fn archive_unstable_storage_diff(
Expand Down Expand Up @@ -337,24 +299,74 @@ where
// - process_events might encounter an error (if the client disconnected)
// - storage_fut might encounter an error while processing a trie queries and
// the error is propagated via the sink.
let _ = futures::future::join(storage_fut, process_events(&mut rx, &mut sink)).await;
let _ =
futures::future::join(storage_fut, process_storage_diff_events(&mut rx, &mut sink))
.await;
};

self.executor.spawn("substrate-rpc-subscription", Some("rpc"), fut.boxed());
}
}

/// Sends all the events to the sink.
async fn process_events(rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>, sink: &mut Subscription) {
while let Some(event) = rx.recv().await {
if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
/// Sends all the events of the storage_diff method to the sink.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little annoying to have two different functions for ArchiveStorageDiffEvent and for ArchiveStorageEvent but I have no better suggestion.

It's fine

async fn process_storage_diff_events(
rx: &mut mpsc::Receiver<ArchiveStorageDiffEvent>,
sink: &mut Subscription,
) {
loop {
tokio::select! {
_ = sink.closed() => {
return
},

maybe_event = rx.recv() => {
let Some(event) = maybe_event else {
break;
};

if event.is_done() {
log::debug!(target: LOG_TARGET, "Finished processing partial trie query");
} else if event.is_err() {
log::debug!(target: LOG_TARGET, "Error encountered while processing partial trie query");
}

if sink.send(&event).await.is_err() {
return
}
}
}
}
}

/// Sends all the events of the storage method to the sink.
async fn process_storage_events(rx: &mut mpsc::Receiver<QueryResult>, sink: &mut Subscription) {
loop {
tokio::select! {
_ = sink.closed() => {
break
}

maybe_storage = rx.recv() => {
let Some(event) = maybe_storage else {
break;
};

match event {
Ok(None) => continue,

Ok(Some(event)) =>
if sink.send(&ArchiveStorageEvent::result(event)).await.is_err() {
return
},

if sink.send(&event).await.is_err() {
return
Err(error) => {
let _ = sink.send(&ArchiveStorageEvent::err(error)).await;
return
}
}
}
}
}

let _ = sink.send(&ArchiveStorageEvent::StorageDone).await;
}
Loading
Loading