diff --git a/Cargo.lock b/Cargo.lock index 6163f3c145..19fad8f209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1561,6 +1561,7 @@ checksum = "2382f75942f4b3be3690fe4f86365e9c853c1587d6ee58212cebf6e2a9ccd101" name = "common-primitives" version = "0.0.0" dependencies = [ + "cid 0.11.1", "enumflags2", "frame-support", "frame-system", @@ -3941,6 +3942,7 @@ dependencies = [ "pallet-message-queue", "pallet-messages", "pallet-messages-runtime-api", + "pallet-migrations", "pallet-msa", "pallet-msa-runtime-api", "pallet-multisig", @@ -7780,19 +7782,18 @@ dependencies = [ name = "pallet-messages" version = "0.0.0" dependencies = [ - "cid 0.11.1", "common-primitives", "common-runtime", "frame-benchmarking", "frame-support", "frame-system", + "log", "multibase", + "pallet-migrations", "parity-scale-codec", "pretty_assertions", - "rand 0.9.1", "scale-info", "serde", - "serde_json", "sp-core", "sp-io", "sp-runtime", diff --git a/Cargo.toml b/Cargo.toml index 210161c68b..0db2406d54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,8 @@ apache-avro = { version = "0.19.0", default-features = false } rand = "0.9.0" parking_lot = "0.12.1" lazy_static = { version = "1.5", features = ["spin_no_std"] } +multibase = { version = "0.9", default-features = false } +cid = { version = "0.11", default-features = false, features = ["alloc"] } # substrate wasm parity-scale-codec = { version = "3.7.4", default-features = false } @@ -65,6 +67,7 @@ pallet-authorship = { git = "https://github.com/paritytech/polkadot-sdk", tag = pallet-balances = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-collective = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-democracy = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } +pallet-migrations = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-multisig = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-preimage = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-scheduler = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } diff --git a/Makefile b/Makefile index e29e8672c2..f6ae827851 100644 --- a/Makefile +++ b/Makefile @@ -419,6 +419,7 @@ LOCAL_URI=ws://localhost:9944 WASM_PATH=./target/release/wbuild/frequency-runtime/frequency_runtime.wasm # Without the state from this minimal set of pallets, try-runtime panics when trying to validate multi-block migrations MINIMAL_PALLETS=ParachainSystem ParachainInfo System Timestamp Aura Authorship +TRY_RUNTIME_BUILD_TYPE=release .PHONY: check-onfinality-api-key check-onfinality-api-key: @@ -440,25 +441,30 @@ try-runtime-%-mainnet: URI := $(MAINNET_URI) try-runtime-%-mainnet: CHAIN := mainnet try-runtime-%-local: URI := $(LOCAL_URI) try-runtime-%-local: CHAIN := local +try-runtime-%-local: WASM_PATH=./target/debug/wbuild/frequency-runtime/frequency_runtime.wasm + build-runtime-paseo-testnet: FEATURES := frequency-testnet build-runtime-bridging-testnet: FEATURES := frequency-testnet,frequency-bridging build-runtime-mainnet: FEATURES := frequency build-runtime-westend-testnet: FEATURES := frequency-westend,frequency-bridging -build-runtime-local: FEATURES := frequency-no-relay,frequency-bridging +build-runtime-local: FEATURES := frequency-no-relay +build-runtime-local: TRY_RUNTIME_BUILD_TYPE := dev .PHONY: build-runtime-paseo-testnet build-runtime-westend-testnet build-runtime-mainnet build-runtime-local +build-runtime-local \ build-runtime-paseo-testnet \ build-runtime-westend-testnet \ build-runtime-mainnet: - cargo build --package frequency-runtime --release --features $(FEATURES),try-runtime --locked + cargo build --package frequency-runtime --profile ${TRY_RUNTIME_BUILD_TYPE} --features $(FEATURES),try-runtime --locked # # The 'try-runtime' targets can optionally be constrained to fetch state for only specific pallets. This is useful to # avoid unnecessarily fetching large state trees for pallets not under test. The list of pallets is: # Msa Messages StatefulStorage Capacity FrequencyTxPayment Handles Passkey Schemas -.PHONY: try-runtime-create-snapshot-paseo-testnet try-runtime-create-snapshot-westend-testnet try-runtime-create-snapshot-mainnet +.PHONY: try-runtime-create-snapshot-paseo-testnet try-runtime-create-snapshot-westend-testnet try-runtime-create-snapshot-mainnet try-runtime-create-snapshot-local +try-runtime-create-snapshot-local \ try-runtime-create-snapshot-paseo-testnet \ try-runtime-create-snapshot-westend-testnet \ try-runtime-create-snapshot-mainnet: check-try-runtime-installed check-onfinality-api-key @@ -470,7 +476,8 @@ try-runtime-upgrade-paseo-testnet \ try-runtime-upgrade-mainnet: try-runtime-upgrade-%: check-try-runtime-installed build-runtime-% try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 live --uri $(URI) -.PHONY: try-runtime-use-snapshot-paseo-testnet try-runtime-use-snapshot-mainnet +.PHONY: try-runtime-use-snapshot-paseo-testnet try-runtime-use-snapshot-mainnet try-runtime-use-snapshot-local +try-runtime-use-snapshot-local \ try-runtime-use-snapshot-paseo-testnet \ try-runtime-use-snapshot-mainnet: try-runtime-use-snapshot-%: check-try-runtime-installed build-runtime-% try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 snap --path $(CHAIN)-$(SNAPSHOT_PALLETS).state @@ -483,11 +490,11 @@ try-runtime-check-migrations-westend-testnet: try-runtime-check-migrations-%: ch .PHONY: try-runtime-check-migrations-local try-runtime-check-migrations-local: check-try-runtime-installed build-runtime-local - try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="pre-and-post" --disable-spec-version-check --disable-mbm-checks --no-weight-warnings live --uri $(URI) $(PALLET_FLAGS) + try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="pre-and-post" --disable-spec-version-check live --uri $(URI) $(PALLET_FLAGS) .PHONY: try-runtime-check-migrations-none-local try-runtime-check-migrations-none-local: check-try-runtime-installed build-runtime-local - try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="none" --disable-spec-version-check --disable-mbm-checks --no-weight-warnings live --uri $(URI) $(PALLET_FLAGS) + try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="none" --disable-spec-version-check live --uri $(URI) $(PALLET_FLAGS) # Pull the Polkadot version from the polkadot-cli package in the Cargo.lock file. # This will break if the lock file format changes diff --git a/common/primitives/Cargo.toml b/common/primitives/Cargo.toml index ed3c025610..f7ad44fa79 100644 --- a/common/primitives/Cargo.toml +++ b/common/primitives/Cargo.toml @@ -30,6 +30,7 @@ sp-runtime-interface = { workspace = true } libsecp256k1 = { workspace = true, features = ["hmac"] } log = "0.4.22" lazy_static = { workspace = true } +cid = { workspace = true } [features] default = ['std'] diff --git a/common/primitives/src/cid.rs b/common/primitives/src/cid.rs new file mode 100644 index 0000000000..ea8dbf12d7 --- /dev/null +++ b/common/primitives/src/cid.rs @@ -0,0 +1,158 @@ +#[cfg(test)] +use cid::multibase::Base; +use cid::{multibase, Cid}; +#[cfg(test)] +use frame_support::assert_ok; +use frame_support::ensure; +use sp_io::hashing::sha2_256; +use sp_runtime::Vec; + +/// Multihash type for wrapping digests (support up to 64-byte digests) +type Multihash = cid::multihash::Multihash<64>; + +/// SHA2-256 multihash code +const SHA2_256: u64 = 0x12; +/// BLAKE3 multihash code +const BLAKE3: u64 = 0x1e; + +/// List of hash algorithms supported by DSNP +const DSNP_HASH_ALGORITHMS: &[u64] = &[SHA2_256, BLAKE3]; + +/// Raw codec for CIDv1 (0x55) +const RAW: u64 = 0x55; + +/// Error enum for CID validation +#[derive(Debug, PartialEq)] +pub enum CidError { + /// Unsupported CID version + UnsupportedCidVersion, + /// Unsupported CID hash algorithm + UnsupportedCidMultihash, + /// Multibase decoding error + MultibaseDecodeError, + /// UTF-8 decoding error + Utf8DecodeError, + /// CID parsing error + InvalidCid, +} + +/// Computes a CIDv1 (RAW + SHA2-256 multihash) +pub fn compute_cid_v1(bytes: &[u8]) -> Option> { + let digest = sha2_256(bytes); + let mh = Multihash::wrap(SHA2_256, &digest).ok()?; + let cid = Cid::new_v1(RAW, mh); + Some(cid.to_bytes()) +} + +/// Validates a CID to conform to IPFS CIDv1 (or higher) formatting and allowed multihashes (does not validate decoded CID fields) +pub fn validate_cid(in_cid: &[u8]) -> Result, CidError> { + // Decode SCALE encoded CID into string slice + let cid_str: &str = core::str::from_utf8(in_cid).map_err(|_| CidError::Utf8DecodeError)?; + ensure!(cid_str.len() > 2, CidError::InvalidCid); + // starts_with handles Unicode multibyte characters safely + ensure!(!cid_str.starts_with("Qm"), CidError::UnsupportedCidVersion); + + // Assume it's a multibase-encoded string. Decode it to a byte array so we can parse the CID. + let cid_b = multibase::decode(cid_str).map_err(|_| CidError::MultibaseDecodeError)?.1; + let cid = Cid::read_bytes(&cid_b[..]).map_err(|_| CidError::InvalidCid)?; + ensure!(DSNP_HASH_ALGORITHMS.contains(&cid.hash().code()), CidError::UnsupportedCidMultihash); + + Ok(cid_b) +} + +#[cfg(test)] +const DUMMY_CID_SHA512: &str = "bafkrgqb76pscorjihsk77zpyst3p364zlti6aojlu4nga34vhp7t5orzwbwwytvp7ej44r5yhjzneanqwb5arcnvuvfwo2d4qgzyx5hymvto4"; +#[cfg(test)] +const DUMMY_CID_SHA256: &str = "bagaaierasords4njcts6vs7qvdjfcvgnume4hqohf65zsfguprqphs3icwea"; +#[cfg(test)] +const DUMMY_CID_BLAKE3: &str = "bafkr4ihn4xalcdzoyslzy2nvf5q6il7vwqjvdhhatpqpctijrxh6l5xzru"; + +#[test] +fn validate_cid_invalid_utf8_errors() { + let bad_cid = vec![0xfc, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1]; + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected Utf8DecodeError"), + CidError::Utf8DecodeError + ); +} + +#[test] +fn validate_cid_too_short_errors() { + let bad_cid = "a".as_bytes().to_vec(); + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); +} + +#[test] +fn validate_cid_v0_errors() { + let bad_cid = "Qmxxx".as_bytes().to_vec(); + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected UnsupportedCidVersion"), + CidError::UnsupportedCidVersion + ); +} + +#[test] +fn validate_cid_invalid_multibase_errors() { + let bad_cid = "aaaa".as_bytes().to_vec(); + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected MultibaseDecodeError"), + CidError::MultibaseDecodeError + ); +} + +#[test] +fn validate_cid_invalid_cid_errors() { + let bad_cid = multibase::encode(Base::Base32Lower, "foo").as_bytes().to_vec(); + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); +} + +#[test] +fn validate_cid_valid_cid_sha2_256_succeeds() { + let cid = DUMMY_CID_SHA256.as_bytes().to_vec(); + assert_ok!(validate_cid(&cid)); +} + +#[test] +fn validate_cid_valid_cid_blake3_succeeds() { + let cid = DUMMY_CID_BLAKE3.as_bytes().to_vec(); + assert_ok!(validate_cid(&cid)); +} + +#[test] +fn validate_cid_invalid_hash_function_errors() { + let bad_cid = DUMMY_CID_SHA512.as_bytes().to_vec(); + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected UnsupportedCidMultihash"), + CidError::UnsupportedCidMultihash + ); +} +#[test] +fn validate_cid_not_valid_multibase() { + // This should not panic, but should return an error. + let bad_cid = vec![55, 197, 136, 0, 0, 0, 0, 0, 0, 0, 0]; + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected MultibaseDecodeError"), + CidError::MultibaseDecodeError + ); +} + +#[test] +fn validate_cid_not_correct_format_errors() { + // This should not panic, but should return an error. + let bad_cid = vec![0, 1, 0, 1, 203, 155, 0, 0, 0, 5, 67]; + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); + + // This should not panic, but should return an error. + let another_bad_cid = vec![241, 0, 0, 0, 0, 0, 128, 132, 132, 132, 58]; + assert_eq!( + validate_cid(&another_bad_cid).expect_err("Expected Utf8DecodeError"), + CidError::Utf8DecodeError + ); +} + +#[test] +fn validate_cid_unwrap_errors() { + // This should not panic, but should return an error. + let bad_cid = vec![102, 70, 70, 70, 70, 70, 70, 70, 70, 48, 48, 48, 54, 53, 53, 48, 48]; + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); +} diff --git a/common/primitives/src/lib.rs b/common/primitives/src/lib.rs index 1db06f6efa..65f8531e42 100644 --- a/common/primitives/src/lib.rs +++ b/common/primitives/src/lib.rs @@ -42,5 +42,7 @@ pub mod offchain; /// Benchmarking helper trait pub mod benchmarks; +/// CID support +pub mod cid; /// Signature support for ethereum pub mod signatures; diff --git a/common/primitives/src/messages.rs b/common/primitives/src/messages.rs index 91c5b374e2..86ba839383 100644 --- a/common/primitives/src/messages.rs +++ b/common/primitives/src/messages.rs @@ -1,6 +1,6 @@ #[cfg(feature = "std")] use crate::utils; -use crate::{msa::MessageSourceId, node::BlockNumber}; +use crate::{msa::MessageSourceId, node::BlockNumber, schema::SchemaId}; use parity_scale_codec::{Decode, Encode}; use scale_info::TypeInfo; #[cfg(feature = "std")] @@ -11,7 +11,7 @@ use alloc::{vec, vec::Vec}; #[cfg(feature = "std")] use utils::*; -/// A type for responding with an single Message in an RPC-call dependent on schema model +/// A type for responding with a single Message in an RPC-call dependent on schema model /// IPFS, Parquet: { index, block_number, provider_msa_id, cid, payload_length } /// Avro, OnChain: { index, block_number, provider_msa_id, msa_id, payload } #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] @@ -20,7 +20,7 @@ pub struct MessageResponse { /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, /// indicating that the original source MSA is acting as its own provider. An id differing from that /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on - /// its behalf . + /// its behalf. pub provider_msa_id: MessageSourceId, /// Index in block to get total order. pub index: u16, @@ -45,6 +45,58 @@ pub struct MessageResponse { #[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))] pub payload_length: Option, } + +/// A type for responding with a single Message in an RPC-call dependent on schema model +/// IPFS, Parquet: { index, block_number, provider_msa_id, cid, payload_length } +/// Avro, OnChain: { index, block_number, provider_msa_id, msa_id, payload } +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +#[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)] +pub struct MessageResponseV2 { + /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, + /// indicating that the original source MSA is acting as its own provider. An id differing from that + /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on + /// its behalf. + pub provider_msa_id: MessageSourceId, + /// Index in block to get total order. + pub index: u16, + /// Block-number for which the message was stored. + pub block_number: BlockNumber, + /// Message source account id (the original source). + #[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))] + pub msa_id: Option, + /// Serialized data in a the schemas. + #[cfg_attr( + feature = "std", + serde(with = "as_hex_option", skip_serializing_if = "Option::is_none", default) + )] + pub payload: Option>, + /// The content address for an IPFS payload in Base32. Will always be CIDv1. + #[cfg_attr( + feature = "std", + serde(with = "as_string_option", skip_serializing_if = "Option::is_none", default) + )] + pub cid: Option>, + /// Offchain payload length (IPFS). + #[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))] + pub payload_length: Option, + /// The SchemaId of the schema that defines the payload format + pub schema_id: SchemaId, +} + +impl Into for MessageResponseV2 { + fn into(self) -> MessageResponse { + MessageResponse { + provider_msa_id: self.provider_msa_id, + index: self.index, + block_number: self.block_number, + msa_id: self.msa_id, + payload: self.payload, + cid: self.cid, + payload_length: self.payload_length, + } + } +} + /// A type for requesting paginated messages. #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] #[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)] @@ -93,7 +145,7 @@ pub struct BlockPaginationResponse { } impl BlockPaginationResponse { - /// Generates a new empty Pagination request + /// Generates a new empty Pagination response pub const fn new() -> BlockPaginationResponse { BlockPaginationResponse { content: vec![], diff --git a/designdocs/schemas_protocols_intents.md b/designdocs/schemas_protocols_intents.md index 680dd5dcde..d95a9cfa0b 100644 --- a/designdocs/schemas_protocols_intents.md +++ b/designdocs/schemas_protocols_intents.md @@ -175,7 +175,7 @@ it's difficult or impossible to bifurcate the storage in the same way as the `me requiring a complete storage migration, new pages/items that are written can include a _storage version magic number_ in either the page or the item header. For `Paginated` storage, this value would precede the `PageNonce`; for `Itemized` storage the value would precede `payload_len`. The 'magic number' would be designed to be the same byte length as the -value currently a byte offset zero within the page/item, and to be a value such that conflict with a valid `nonce` or +value currently at byte offset zero within the page/item, and to be a value such that conflict with a valid `nonce` or `payload_len` would be highly unlikely, if not impossible. New structures would be defined, ie `PageV2` and `ItemizedItemV2`, and decoding values read from storage would need to diff --git a/pallets/messages/Cargo.toml b/pallets/messages/Cargo.toml index fbf0fe3aa3..d615d90d51 100644 --- a/pallets/messages/Cargo.toml +++ b/pallets/messages/Cargo.toml @@ -14,6 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] parity-scale-codec = { workspace = true, features = ["derive"] } +log = { workspace = true } scale-info = { workspace = true, features = ["derive"] } # Substrate frame-benchmarking = { workspace = true, optional = true } @@ -24,16 +25,14 @@ sp-io = { workspace = true } sp-runtime = { workspace = true } # Frequency related dependencies common-primitives = { default-features = false, path = "../../common/primitives" } -cid = { version = "0.11", default-features = false } -multibase = { version = "0.9", default-features = false } +multibase = { workspace = true } [dev-dependencies] common-runtime = { path = '../../runtime/common', default-features = false } # Testing dependencies -rand = { workspace = true } +pallet-migrations = { workspace = true } pretty_assertions = { workspace = true } serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } [features] default = ['std'] diff --git a/pallets/messages/README.md b/pallets/messages/README.md index 46c490b694..dd5f3cad9c 100644 --- a/pallets/messages/README.md +++ b/pallets/messages/README.md @@ -4,16 +4,16 @@ Stores block-indexed data for a Schema using `OnChain` or `IPFS` payload locatio ## Summary -Messages stores metadata and message payload data for a set Schema. -Message payloads are meant for streaming data, where when the message was sent is the focus. +`messages` stores metadata and message payload data for a set Schema. +Message payloads are meant for streaming data, where _when_ the message was sent is the focus. Discovery is via the `MessagesInBlock` on new blocks or requesting messages from a block. -Retrieval is via RPC. +Retrieval is via a custom runtime API. ### Metadata vs Payload Messages have both metadata and payloads. The payload should always match the data structure or the message is considered invalid. -The metadata is the Block Number, Schema Id, and other data useful for discovering and organizing the payload information. +The metadata is the Block Number, Intent Id, and other data useful for discovering and organizing the payload information. ### Payload Options @@ -22,35 +22,36 @@ The metadata is the Block Number, Schema Id, and other data useful for discoveri ### Message Ordering -Messages are ordered by block number, and within each block, they follow a specific order based on their transaction sequence within that block. +Messages are ordered by block number and IntentId, and within each block, they follow a specific order based on their transaction sequence within that block. This order is immutable. ### Actions The Messages pallet provides for: -- Adding messages for a given schema -- Enabling the retrieval of messages for a given schema +- Adding messages for a given Intent +- Enabling the retrieval of messages for a given Intent ## Interactions ### Extrinsics | Name/Description | Caller | Payment | Key Events | Runtime Added | -| ---------------------------------------------------------------------------------------- | -------- | ------------------ | --------------------------------------------------------------------------------------------------------------------------------- | ------------- | +|------------------------------------------------------------------------------------------|----------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------|---------------| | `add_ipfs_message`
Add a message to a Schema with an `IPFS` payload location | Provider | Capacity or Tokens | [`MessagesInBlock`](https://frequency-chain.github.io/frequency/pallet_messages/pallet/enum.Event.html#variant.MessagesInBlock)\* | 1 | | `add_onchain_message`
Add a message to a Schema with an `ON_CHAIN` payload location | Provider | Capacity or Tokens | [`MessagesInBlock`](https://frequency-chain.github.io/frequency/pallet_messages/pallet/enum.Event.html#variant.MessagesInBlock)\* | 1 | -\* The `MessagesInBlock` may not occur more than once per block and does _not_ indicate which schema received messages. +\* The `MessagesInBlock` may occur at most once per block and does _not_ indicate which Intent(s) received messages. See [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages/pallet/struct.Pallet.html) for more details. ### State Queries -| Name | Description | Query | Runtime Added | -| --------------- | ----------------------------------------------------------------------------------------------------------------------------- | ------------ | ------------- | -| Get Messages v2 | _Suggested_: Use RPC instead of this storage directly. Storage for the messages by Block number, Schema Id, and Message Index | `messagesV2` | 61 | -| Get Messages v1 | Removed in Runtime 60 | `messages` | 1-60 | +| Name | Description | Query | Runtime Added | +|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|--------------|---------------| +| MessagesV3 | Suggested: Use custom runtime API instead of querying this storage directly.
Storage for the messages by Block Number, IntentId, and MessageIndex | `messagesV3` | ? | +| MessagesV2 | Removed in Runtime ?? | `messagesV2` | 61 | +| Messages | Removed in Runtime 60 | `messages` | 1-60 | See the [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages/pallet/storage_types/index.html) for additional state queries and details. @@ -58,8 +59,19 @@ See the [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages/ Note: May be restricted based on node settings and configuration. -| Name | Description | Call | Node Version | -| ------------------------- | ------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ | -| Get Messages by Schema Id | Fetch paginated messages for a specific Schema Id in the given block range for a given Schema Id | [`getBySchemaId`](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html#tymethod.get_messages_by_schema_id) | v1.0.0+ | +| Name | Description | Call | Node Version | +|------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|--------------| +| Get Messages by Schema Id _(deprecated)_ | Fetch paginated messages for a specific Schema Id in the given block range for a given Schema Id
Deprecated in `v1.17.?`. Use custom Runtime API `get_messages_by_intent_id` instead | [`getBySchemaId`](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html#tymethod.get_messages_by_schema_id) | v1.0.0+ | See [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html) for more details. + +### Runtime API + +| Name | Description | Call | API Version Added | Runtime Added | +|-------------------------------------------------|---------------------------------------------------------------------------|-------------------------------|-------------------|---------------| +| Get Schema by Id _(deprecated)_ | Retrieves the schema for the given Schema Id | `getBySchemaId` | 1 | 1 | +| Get Messages by Schema and Block _(deprecated)_ | Retrieve the messages for a particular schema and block number | `getMessagesBySchemaAndBlock` | 1 | 1 | +| Get Messages by Intent and Block | Retrieve the messages for a particular intent and block range (paginated) | `getMessagesByIntentId` | 2 | ? | + +See [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages_runtime_api/trait.MessagesRuntimeApi.html) for +more details. diff --git a/pallets/messages/src/benchmarking.rs b/pallets/messages/src/benchmarking.rs index 1e8ebd7d9e..1ad7c9b0c9 100644 --- a/pallets/messages/src/benchmarking.rs +++ b/pallets/messages/src/benchmarking.rs @@ -1,27 +1,26 @@ #![allow(clippy::expect_used)] use super::*; -#[allow(unused)] use crate::Pallet as MessagesPallet; use common_primitives::{ msa::{DelegatorId, ProviderId}, schema::*, }; use frame_benchmarking::v2::*; -use frame_support::{assert_ok, pallet_prelude::DispatchResult}; +use frame_support::{ + assert_ok, migrations::SteppedMigration, pallet_prelude::DispatchResult, weights::WeightMeter, +}; use frame_system::{pallet_prelude::BlockNumberFor, RawOrigin}; use sp_runtime::traits::One; extern crate alloc; use alloc::vec; -const SCHEMA_SIZE: u16 = 50; const IPFS_PAYLOAD_LENGTH: u32 = 10; const MAX_MESSAGES_IN_BLOCK: u32 = 500; -const ON_CHAIN_SCHEMA_ID: u16 = 16001; -// this value should be the same as the one used in mocks tests +// Any IPFS schema ID from the [pallet genesis file](../../../resources/genesis-schemas.json) will work const IPFS_SCHEMA_ID: u16 = 20; -fn onchain_message(schema_id: SchemaId) -> DispatchResult { +fn onchain_message(intent_id: IntentId, schema_id: SchemaId) -> DispatchResult { let message_source_id = DelegatorId(1); let provider_id = ProviderId(1); let payload = Vec::from( @@ -33,6 +32,7 @@ fn onchain_message(schema_id: SchemaId) -> DispatchResult { provider_id.into(), Some(message_source_id.into()), bounded_payload, + intent_id, schema_id, BlockNumberFor::::one(), )?; @@ -40,7 +40,7 @@ fn onchain_message(schema_id: SchemaId) -> DispatchResult { } /// Helper function to call MessagesPallet::::add_ipfs_message -fn ipfs_message(schema_id: SchemaId) -> DispatchResult { +fn ipfs_message(intent_id: IntentId, schema_id: SchemaId) -> DispatchResult { let payload = Vec::from("bafkreidgvpkjawlxz6sffxzwgooowe5yt7i6wsyg236mfoks77nywkptdq".as_bytes()); let provider_id = ProviderId(1); @@ -51,6 +51,7 @@ fn ipfs_message(schema_id: SchemaId) -> DispatchResult { provider_id.into(), None, bounded_payload, + intent_id, schema_id, BlockNumberFor::::one(), )?; @@ -58,24 +59,10 @@ fn ipfs_message(schema_id: SchemaId) -> DispatchResult { Ok(()) } -fn create_intent_and_schema(location: PayloadLocation) -> DispatchResult { - let intent_id = T::SchemaBenchmarkHelper::create_intent( - b"benchmark.intent".to_vec(), - location, - Vec::default(), - )?; - let _ = T::SchemaBenchmarkHelper::create_schema( - intent_id, - Vec::from(r#"{"Name": "Bond", "Code": "007"}"#.as_bytes()), - ModelType::AvroBinary, - location, - )?; - Ok(()) -} - #[benchmarks] mod benchmarks { use super::*; + use frame_support::{pallet_prelude::StorageVersion, traits::GetStorageVersion}; #[benchmark] fn add_onchain_message( @@ -83,12 +70,20 @@ mod benchmarks { ) -> Result<(), BenchmarkError> { let message_source_id = DelegatorId(2); let caller: T::AccountId = whitelisted_caller(); - let schema_id = ON_CHAIN_SCHEMA_ID; - // schema ids start from 1, and we need to add that many to make sure our desired id exists - for _ in 0..=SCHEMA_SIZE { - assert_ok!(create_intent_and_schema::(PayloadLocation::OnChain)); - } + // The schemas pallet genesis does not contain an OnChain intent/schema, so we create one here + let intent_id = T::SchemaBenchmarkHelper::create_intent( + b"benchmark.onchain-intent".to_vec(), + PayloadLocation::OnChain, + Vec::default(), + )?; + + let schema_id = T::SchemaBenchmarkHelper::create_schema( + intent_id, + Vec::from(r#"{"Name": "Bond", "Code": "007"}"#.as_bytes()), + ModelType::AvroBinary, + PayloadLocation::OnChain, + )?; assert_ok!(T::MsaBenchmarkHelper::add_key(ProviderId(1).into(), caller.clone())); assert_ok!(T::MsaBenchmarkHelper::set_delegation_relationship( @@ -99,15 +94,15 @@ mod benchmarks { let payload = vec![1; n as usize]; for _ in 1..MAX_MESSAGES_IN_BLOCK { - assert_ok!(onchain_message::(schema_id)); + assert_ok!(onchain_message::(intent_id, schema_id)); } #[extrinsic_call] _(RawOrigin::Signed(caller), Some(message_source_id.into()), schema_id, payload); assert_eq!( - MessagesPallet::::get_messages_by_schema_and_block( - schema_id, + MessagesPallet::::get_messages_by_intent_and_block( + intent_id, PayloadLocation::OnChain, BlockNumberFor::::one() ) @@ -125,22 +120,19 @@ mod benchmarks { .as_bytes() .to_vec(); let schema_id = IPFS_SCHEMA_ID; + let intent_id = IPFS_SCHEMA_ID as IntentId; - // schema ids start from 1, and we need to add that many to make sure our desired id exists - for _ in 0..=SCHEMA_SIZE { - assert_ok!(create_intent_and_schema::(PayloadLocation::IPFS)); - } assert_ok!(T::MsaBenchmarkHelper::add_key(ProviderId(1).into(), caller.clone())); for _ in 1..MAX_MESSAGES_IN_BLOCK { - assert_ok!(ipfs_message::(schema_id)); + assert_ok!(ipfs_message::(intent_id, schema_id)); } #[extrinsic_call] _(RawOrigin::Signed(caller), schema_id, cid, IPFS_PAYLOAD_LENGTH); assert_eq!( - MessagesPallet::::get_messages_by_schema_and_block( - schema_id, + MessagesPallet::::get_messages_by_intent_and_block( + intent_id, PayloadLocation::IPFS, BlockNumberFor::::one() ) @@ -150,6 +142,59 @@ mod benchmarks { Ok(()) } + /// Benchmark a single step of the `v3::MigrateV2ToV3` migration. Here we benchmark the cost + /// to migrate a _single record_. This weight is then used in the migration itself for self-metering. + #[benchmark] + fn v2_to_v3_step() { + let payload: BoundedVec = + vec![1; 3072].try_into().expect("Unable to create BoundedVec payload"); + let old_message = migration::v2::Message { + payload: payload.clone(), + provider_msa_id: 1u64, + msa_id: Some(1u64), + }; + migration::v2::MessagesV2::::insert( + (BlockNumberFor::::default(), 0u16, 0u16), + old_message, + ); + let mut meter = WeightMeter::new(); + + #[block] + { + migration::v3::MigrateV2ToV3::>::step(None, &mut meter) + .expect("migration call failed"); + } + + // Check that the new storage is decodable: + let new_message = + crate::MessagesV3::::try_get((BlockNumberFor::::default(), 0u16, 0u16)); + assert!(new_message.is_ok()); + let new_message = new_message.expect("Unable to fetch migrated message"); + assert_eq!(Some(1u64), new_message.msa_id); + assert_eq!(1u64, new_message.provider_msa_id); + assert_eq!(payload, new_message.payload); + assert_eq!(new_message.schema_id, 0u16); + } + + /// Benchmark a single step of the `v3::FinalizeV3Migration` migration. Here we benchmark the cost + /// to migrate a _single record_. This weight is then used in the migration itself for self-metering. + #[benchmark] + fn v2_to_v3_final_step() { + let mut meter = WeightMeter::new(); + + #[block] + { + migration::v3::FinalizeV3Migration::>::step( + None, &mut meter, + ) + .expect("final storage version migration failed"); + } + + // Check that the storage version was correctly set + let new_version = Pallet::::on_chain_storage_version(); + assert_eq!(new_version, StorageVersion::new(3)); + } + impl_benchmark_test_suite!( MessagesPallet, crate::tests::mock::new_test_ext(), diff --git a/pallets/messages/src/lib.rs b/pallets/messages/src/lib.rs index df46c198b8..cbbfd8353a 100644 --- a/pallets/messages/src/lib.rs +++ b/pallets/messages/src/lib.rs @@ -29,13 +29,19 @@ pub mod weights; mod types; +/// Storage migrations +pub mod migration; + use core::{convert::TryInto, fmt::Debug}; use frame_support::{ensure, pallet_prelude::Weight, traits::Get, BoundedVec}; use sp_runtime::DispatchError; extern crate alloc; +extern crate core; + use alloc::vec::Vec; use common_primitives::{ + cid::*, messages::*, msa::{ DelegatorId, MessageSourceId, MsaLookup, MsaValidator, ProviderId, SchemaGrantValidator, @@ -52,7 +58,7 @@ pub use pallet::*; pub use types::*; pub use weights::*; -use cid::Cid; +use common_primitives::node::BlockNumber; use frame_system::pallet_prelude::*; #[frame_support::pallet] @@ -61,7 +67,7 @@ pub mod pallet { use frame_support::pallet_prelude::*; /// The current storage version. - pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(2); + pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(3); #[pallet::config] pub trait Config: frame_system::Config { @@ -103,12 +109,13 @@ pub mod pallet { #[pallet::whitelist_storage] pub(super) type BlockMessageIndex = StorageValue<_, MessageIndex, ValueQuery>; + /// Storage for messages #[pallet::storage] - pub(super) type MessagesV2 = StorageNMap< + pub(super) type MessagesV3 = StorageNMap< _, ( storage::Key>, - storage::Key, + storage::Key, storage::Key, ), Message, @@ -173,7 +180,7 @@ pub mod pallet { #[pallet::call] impl Pallet { /// Adds a message for a resource hosted on IPFS. The input consists of - /// both a Base32-encoded [CID](https://docs.ipfs.tech/concepts/content-addressing/#version-1-v1) + /// a Base32-encoded [CID](https://docs.ipfs.tech/concepts/content-addressing/#version-1-v1) /// as well as a 32-bit content length. The stored payload will contain the /// CID encoded as binary, as well as the 32-bit message content length. /// The actual message content will be on IPFS. @@ -182,13 +189,13 @@ pub mod pallet { /// * [`Event::MessagesInBlock`] - Messages Stored in the block /// /// # Errors - /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large - /// * [`Error::InvalidSchemaId`] - Schema not found - /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location - /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA - /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full - /// * [`Error::UnsupportedCidVersion`] - CID version is not supported (V0) - /// * [`Error::InvalidCid`] - Unable to parse provided CID + /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large. + /// * [`Error::InvalidSchemaId`] - Schema not found. + /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location. + /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA. + /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full. + /// * [`Error::UnsupportedCidVersion`] - CID version is not supported (V0). + /// * [`Error::InvalidCid`] - Unable to parse provided CID. /// #[pallet::call_index(0)] #[pallet::weight(T::WeightInfo::add_ipfs_message())] @@ -218,6 +225,7 @@ pub mod pallet { provider_msa_id, None, bounded_payload, + schema.intent_id, schema_id, current_block, )? { @@ -235,12 +243,12 @@ pub mod pallet { /// * [`Event::MessagesInBlock`] - In the next block /// /// # Errors - /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large - /// * [`Error::InvalidSchemaId`] - Schema not found - /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location - /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA - /// * [`Error::UnAuthorizedDelegate`] - Trying to add a message without a proper delegation between the origin and the on_behalf_of MSA - /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full + /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large. + /// * [`Error::InvalidSchemaId`] - Schema not found. + /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location. + /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA. + /// * [`Error::UnAuthorizedDelegate`] - Trying to add a message without a proper delegation between the origin and the on_behalf_of MSA. + /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full. /// #[pallet::call_index(1)] #[pallet::weight(T::WeightInfo::add_onchain_message(payload.len() as u32))] @@ -285,6 +293,7 @@ pub mod pallet { provider_msa_id, Some(maybe_delegator.into()), bounded_payload, + schema.intent_id, schema_id, current_block, )? { @@ -309,18 +318,20 @@ impl Pallet { provider_msa_id: MessageSourceId, msa_id: Option, payload: BoundedVec, + intent_id: IntentId, schema_id: SchemaId, current_block: BlockNumberFor, ) -> Result { let index = BlockMessageIndex::::get(); let first = index == 0; let msg = Message { + schema_id, payload, // size is checked on top of extrinsic provider_msa_id, msa_id, }; - >::insert((current_block, schema_id, index), msg); + >::insert((current_block, intent_id, index), msg); BlockMessageIndex::::set(index.saturating_add(1)); Ok(first) } @@ -336,27 +347,28 @@ impl Pallet { .map_err(|_| Error::::InvalidMessageSourceAccount)?) } - /// Gets a messages for a given schema-id and block-number. + /// Gets messages for a given IntentId and block number. /// - /// Payload location is included to map to correct response (To avoid fetching the schema in this method) + /// Payload location is included to map to correct response (To avoid fetching the Intent in this method) /// - /// Result is a vector of [`MessageResponse`]. + /// Result is a vector of [`MessageResponseV2`]. /// - pub fn get_messages_by_schema_and_block( - schema_id: SchemaId, - schema_payload_location: PayloadLocation, + pub fn get_messages_by_intent_and_block( + intent_id: IntentId, + payload_location: PayloadLocation, block_number: BlockNumberFor, - ) -> Vec { - let block_number_value: u32 = block_number.try_into().unwrap_or_default(); + ) -> Vec { + let block_number_value: BlockNumber = block_number.try_into().unwrap_or_default(); - match schema_payload_location { + match payload_location { PayloadLocation::Itemized | PayloadLocation::Paginated => Vec::new(), _ => { - let mut messages: Vec<_> = >::iter_prefix((block_number, schema_id)) - .map(|(index, msg)| { - msg.map_to_response(block_number_value, schema_payload_location, index) - }) - .collect(); + let mut messages: Vec = + MessagesV3::::iter_prefix((block_number, intent_id)) + .filter_map(|(index, msg)| { + msg.map_to_response((block_number_value, payload_location, index)) + }) + .collect(); messages.sort_by(|a, b| a.index.cmp(&b.index)); messages }, @@ -370,16 +382,9 @@ impl Pallet { /// * [`Error::InvalidCid`] - Unable to parse provided CID /// pub fn validate_cid(in_cid: &[u8]) -> Result, DispatchError> { - // Decode SCALE encoded CID into string slice - let cid_str: &str = core::str::from_utf8(in_cid).map_err(|_| Error::::InvalidCid)?; - ensure!(cid_str.len() > 2, Error::::InvalidCid); - // starts_with handles Unicode multibyte characters safely - ensure!(!cid_str.starts_with("Qm"), Error::::UnsupportedCidVersion); - - // Assume it's a multibase-encoded string. Decode it to a byte array so we can parse the CID. - let cid_b = multibase::decode(cid_str).map_err(|_| Error::::InvalidCid)?.1; - ensure!(Cid::read_bytes(&cid_b[..]).is_ok(), Error::::InvalidCid); - - Ok(cid_b) + Ok(validate_cid(in_cid).map_err(|e| match e { + CidError::UnsupportedCidVersion => Error::::UnsupportedCidVersion, + _ => Error::::InvalidCid, + })?) } } diff --git a/pallets/messages/src/migration/mod.rs b/pallets/messages/src/migration/mod.rs new file mode 100644 index 0000000000..82f8d8762a --- /dev/null +++ b/pallets/messages/src/migration/mod.rs @@ -0,0 +1,9 @@ +/// Migration module for migrating from V2 to V3 +pub mod v2; + +#[cfg(all(test, not(feature = "runtime-benchmarks")))] +mod tests; +/// Migration module for migrating from V2 to V3 +pub mod v3; + +pub use v3::{FinalizeV3Migration, MigrateV2ToV3}; diff --git a/pallets/messages/src/migration/tests.rs b/pallets/messages/src/migration/tests.rs new file mode 100644 index 0000000000..4c33f2f826 --- /dev/null +++ b/pallets/messages/src/migration/tests.rs @@ -0,0 +1,70 @@ +use crate::{ + migration::v2, + pallet, + tests::mock::{ + new_test_ext, run_to_block_with_migrations, AllPalletsWithSystem, MigratorServiceWeight, + System, Test as T, + }, + weights, +}; +use frame_support::{pallet_prelude::StorageVersion, traits::OnRuntimeUpgrade, BoundedVec}; +use frame_system::pallet_prelude::BlockNumberFor; +use pallet_migrations::WeightInfo as _; + +#[test] +fn lazy_migration_works() { + new_test_ext().execute_with(|| { + const MESSAGE_COUNT: u16 = 500; + const ITEMS_PER_BLOCK: u16 = 16; + const BLOCKS_PER_RUN: u16 = MESSAGE_COUNT.div_ceil(ITEMS_PER_BLOCK); + frame_support::__private::sp_tracing::try_init_simple(); + // Insert some values into the old storage map. + let payload: BoundedVec::MessagesMaxPayloadSizeBytes> = + vec![1; 3072].try_into().expect("Unable to create BoundedVec payload"); + let old_message = + v2::Message { payload: payload.clone(), provider_msa_id: 1u64, msa_id: Some(1u64) }; + let mut keys: Vec<(BlockNumberFor, u16, u16)> = Vec::new(); + for i in 0..MESSAGE_COUNT { + keys.push((BlockNumberFor::::default(), 0u16, i)); + v2::MessagesV2::::insert( + (BlockNumberFor::::default(), 0u16, i), + old_message.clone(), + ); + } + + // Give it enough weight do do exactly 16 iterations: + let limit = ::WeightInfo::progress_mbms_none() + + pallet_migrations::Pallet::::exec_migration_max_weight() + + as crate::weights::WeightInfo>::v2_to_v3_step() * + ITEMS_PER_BLOCK as u64; + MigratorServiceWeight::set(&limit); + + StorageVersion::new(2).put::>(); + System::set_block_number(1); + AllPalletsWithSystem::on_runtime_upgrade(); // onboard MBMs + + let mut v2_expected: u16 = 500; + let mut v3_expected: u16 = 0; + for block in 2..=(BLOCKS_PER_RUN + 2) { + run_to_block_with_migrations(block as u32); + + v2_expected = v2_expected.saturating_sub(ITEMS_PER_BLOCK); + v3_expected += ITEMS_PER_BLOCK; + v3_expected = u16::min(v3_expected, MESSAGE_COUNT); + + let v2_count = v2::MessagesV2::::iter().count(); + let v3_count = crate::MessagesV3::::iter().count(); + assert_eq!(v2_expected as usize, v2_count); + assert_eq!(v3_expected as usize, v3_count); + } + + // Check that everything is in new storage now: + for i in keys.iter() { + assert!(crate::MessagesV3::::contains_key(i)); + } + + // Verify that the version is updated: + let current_version = StorageVersion::get::>(); + assert_eq!(current_version, 3); + }); +} diff --git a/pallets/messages/src/migration/v2/mod.rs b/pallets/messages/src/migration/v2/mod.rs new file mode 100644 index 0000000000..5c7b3f7550 --- /dev/null +++ b/pallets/messages/src/migration/v2/mod.rs @@ -0,0 +1,40 @@ +use crate::{Config, MessageIndex, Pallet}; +use common_primitives::{msa::MessageSourceId, schema::SchemaId}; +use core::fmt::Debug; +use frame_support::{pallet_prelude::*, storage_alias}; +use frame_system::pallet_prelude::BlockNumberFor; +use sp_runtime::codec::{Decode, Encode}; + +/// Storage for messages in v2 and lower +/// - Key: (block_number, schema_id, message_index) +/// - Value: Message +#[storage_alias] +pub(crate) type MessagesV2 = StorageNMap< + Pallet, + ( + storage::Key>, + storage::Key, + storage::Key, + ), + Message<::MessagesMaxPayloadSizeBytes>, + OptionQuery, +>; + +/// A single message type definition for V2 storage +#[derive(Clone, Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] +#[scale_info(skip_type_params(MaxDataSize))] +#[codec(mel_bound(MaxDataSize: MaxEncodedLen))] +pub struct Message +where + MaxDataSize: Get + Debug, +{ + /// Data structured by the associated schema's model. + pub payload: BoundedVec, + /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, + /// indicating that the original source MSA is acting as its own provider. An id differing from that + /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on + /// its behalf. + pub provider_msa_id: MessageSourceId, + /// Message source account id (the original source). + pub msa_id: Option, +} diff --git a/pallets/messages/src/migration/v3.rs b/pallets/messages/src/migration/v3.rs new file mode 100644 index 0000000000..0432eb085b --- /dev/null +++ b/pallets/messages/src/migration/v3.rs @@ -0,0 +1,173 @@ +use crate::{migration::v2, pallet::MessagesV3, weights, Config, Message, MessageIndex, Pallet}; +#[cfg(feature = "try-runtime")] +use alloc::vec::Vec; +use common_primitives::schema::SchemaId; +use core::marker::PhantomData; +use frame_support::{ + migrations::{MigrationId, SteppedMigration, SteppedMigrationError}, + pallet_prelude::StorageVersion, + weights::WeightMeter, +}; +use frame_system::pallet_prelude::BlockNumberFor; +#[cfg(feature = "try-runtime")] +use parity_scale_codec::{Decode, Encode}; + +const LOG_TARGET: &str = "pallet::messages::migration::v3"; + +fn convert_from_old( + id: SchemaId, + old_info: v2::Message, +) -> Message { + Message { + schema_id: id, + payload: old_info.payload, + provider_msa_id: old_info.provider_msa_id, + msa_id: old_info.msa_id, + } +} + +/// Migrates the items of the [`v2::MessagesV2`] map to [`crate::MessagesV3`] +/// +/// The `step` function will be called once per block. It is very important that this function +/// *never* panics and never uses more weight than it got in its meter. The migrations should also +/// try to make maximal progress per step, so that the total time it takes to migrate stays low. +pub struct MigrateV2ToV3(PhantomData<(T, W)>); +impl SteppedMigration for MigrateV2ToV3 { + type Cursor = (BlockNumberFor, SchemaId, MessageIndex); + // Without the explicit length here the construction of the ID would not be infallible. + type Identifier = MigrationId<31>; + + /// The identifier of this migration. Which should be globally unique. + fn id() -> Self::Identifier { + MigrationId { + pallet_id: *b"pallet::messages::migration::v3", + version_from: 2, + version_to: 3, + } + } + + /// The actual logic of the migration. + /// + /// This function is called repeatedly until it returns `Ok(None)`, indicating that the + /// migration is complete. Ideally, the migration should be designed in such a way that each + /// step consumes as much weight as possible. + fn step( + cursor: Option, + meter: &mut WeightMeter, + ) -> Result, SteppedMigrationError> { + let required = W::v2_to_v3_step(); + // If there is not enough weight for a single step, return an error. This case can be + // problematic if it is the first migration that ran in this block. But there is nothing + // that we can do about it here. + if meter.remaining().any_lt(required) { + return Err(SteppedMigrationError::InsufficientWeight { required }); + } + + let mut count = 0u32; + + let mut iter = v2::MessagesV2::::drain(); + let mut last_cursor = cursor; + + // We loop here to do as much progress as possible per step. + loop { + if meter.try_consume(required).is_err() { + break; + } + + // If there's a next item in the iterator, perform the migration. + if let Some(((block_number, schema_id, index), value)) = iter.next() { + count += 1; + // Migrate the inner value to the new structure + let value = convert_from_old::(schema_id, value); + // We can just insert here since the old and the new map share the same key-space. + MessagesV3::::insert((block_number, schema_id, index), value); + last_cursor = Some((block_number, schema_id, index)); + } else { + log::info!(target: LOG_TARGET, "Migrated final {} messages", count); + return Ok(None); + } + } + + log::info!(target: LOG_TARGET, "Migrated {} messages", count); + Ok(last_cursor) + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result, frame_support::sp_runtime::TryRuntimeError> { + // Return the state of the storage before the migration. + Ok((v2::MessagesV2::::iter().count() as u32).encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(prev: Vec) -> Result<(), frame_support::sp_runtime::TryRuntimeError> { + // Check the state of the storage after the migration. + let prev_messages = + ::decode(&mut &prev[..]).expect("Failed to decode the previous storage state"); + + // Check the len of prev and post are the same. + assert_eq!( + MessagesV3::::iter().count() as u32, + prev_messages, + "Migration failed: the number of items in the storage after the migration is not the same as before" + ); + + Ok(()) + } +} + +/// Finalize the migration of [`v2::MessagesV2`] map to [`crate::MessagesV3`] +/// by updating the pallet storage version. +pub struct FinalizeV3Migration(PhantomData<(T, W)>); +impl SteppedMigration for FinalizeV3Migration { + type Cursor = (BlockNumberFor, SchemaId, MessageIndex); + // Without the explicit length here the construction of the ID would not be infallible. + type Identifier = MigrationId<40>; + + /// The identifier of this migration. Which should be globally unique. + fn id() -> Self::Identifier { + MigrationId { + pallet_id: *b"pallet::messages::migration::v3-finalize", + version_from: 2, + version_to: 3, + } + } + + /// Final migration step + fn step( + _cursor: Option, + meter: &mut WeightMeter, + ) -> Result, SteppedMigrationError> { + let required = W::v2_to_v3_final_step(); + // If there is not enough weight for a single step, return an error. This case can be + // problematic if it is the first migration that ran in this block. But there is nothing + // that we can do about it here. + if meter.try_consume(required).is_err() { + return Err(SteppedMigrationError::InsufficientWeight { required }); + } + StorageVersion::new(3).put::>(); + + log::info!(target: LOG_TARGET, "Finalized messages pallet migration: storage version set to 3"); + Ok(None) + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result, frame_support::sp_runtime::TryRuntimeError> { + // Return the storage version before the migration + Ok(StorageVersion::get::>().encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(prev: Vec) -> Result<(), frame_support::sp_runtime::TryRuntimeError> { + // Check the state of the storage after the migration. + let prev_version = ::decode(&mut &prev[..]) + .expect("Failed to decode the previous storage state"); + + // Check the len of prev and post are the same. + assert!( + StorageVersion::get::>() > prev_version, + "Migration failed: current storage version is not greater than the previous storage version" + ); + + Ok(()) + } +} diff --git a/pallets/messages/src/rpc/src/lib.rs b/pallets/messages/src/rpc/src/lib.rs index 2bfa3004fa..b59a13f412 100644 --- a/pallets/messages/src/rpc/src/lib.rs +++ b/pallets/messages/src/rpc/src/lib.rs @@ -93,7 +93,8 @@ where let at = self.client.info().best_hash; // Schema Fetch and Check - let schema: SchemaResponseV2 = match api.get_schema_by_id(at, schema_id) { + #[allow(deprecated)] + let schema: SchemaResponse = match api.get_schema_by_id(at, schema_id) { Ok(Some(s)) => s, _ => fail!(MessageRpcError::InvalidSchemaId), }; @@ -104,6 +105,7 @@ where let mut from_index = pagination.from_index; 'loops: for block_number in from..to { + #[allow(deprecated)] let list: Vec = api .get_messages_by_schema_and_block( at, diff --git a/pallets/messages/src/rpc/src/tests/mod.rs b/pallets/messages/src/rpc/src/tests/mod.rs index 9280c8f6be..805c2349de 100644 --- a/pallets/messages/src/rpc/src/tests/mod.rs +++ b/pallets/messages/src/rpc/src/tests/mod.rs @@ -36,16 +36,14 @@ fn test_messages() -> Vec { sp_api::mock_impl_runtime_apis! { impl MessagesRuntimeApi for TestRuntimeApi { - fn get_schema_by_id(schema_id: SchemaId) -> Option { + fn get_schema_by_id(schema_id: SchemaId) -> Option { match schema_id { - SCHEMA_ID_EMPTY | SCHEMA_ID_HAS_MESSAGES => Some(SchemaResponseV2 { + SCHEMA_ID_EMPTY | SCHEMA_ID_HAS_MESSAGES => Some(SchemaResponse { schema_id, - intent_id: schema_id, model: b"schema".to_vec(), model_type: ModelType::AvroBinary, payload_location: PayloadLocation::OnChain, settings: Vec::new(), - status: SchemaStatus::Active, }), _ => None, } diff --git a/pallets/messages/src/runtime-api/src/lib.rs b/pallets/messages/src/runtime-api/src/lib.rs index e4e57ee7a9..102f4a733e 100644 --- a/pallets/messages/src/runtime-api/src/lib.rs +++ b/pallets/messages/src/runtime-api/src/lib.rs @@ -34,10 +34,18 @@ sp_api::decl_runtime_apis! { pub trait MessagesRuntimeApi { /// Retrieve the messages for a particular schema and block number + // TODO: Remove once all RPC nodes have been updated to remove get_messages_by_schema_id RPC method + #[deprecated(note = "Please use get_messages_by_intent_and_block instead")] fn get_messages_by_schema_and_block(schema_id: SchemaId, schema_payload_location: PayloadLocation, block_number: BlockNumber) -> Vec; + /// Retrieve the messages for a particular intent and block range (paginated) + #[api_version(2)] + fn get_messages_by_intent_id(intent_id: IntentId, pagination: BlockPaginationRequest) -> BlockPaginationResponse; + /// Retrieve a schema by id - fn get_schema_by_id(schema_id: SchemaId) -> Option; + // TODO: Remove once all RPC nodes have been updated to call the schemas pallet runtime for this + #[deprecated(note = "Use SchemasRuntimeApi_get_schema_by_id instead")] + fn get_schema_by_id(schema_id: SchemaId) -> Option; } } diff --git a/pallets/messages/src/tests/mock.rs b/pallets/messages/src/tests/mock.rs index 9f4245c306..d6fb1e782c 100644 --- a/pallets/messages/src/tests/mock.rs +++ b/pallets/messages/src/tests/mock.rs @@ -6,9 +6,11 @@ use common_primitives::{ }, schema::*, }; +use frame_support::{derive_impl, pallet_prelude::Weight}; use frame_support::{ dispatch::DispatchResult, + migrations::MultiStepMigrator, parameter_types, traits::{ConstU16, ConstU32, OnFinalize, OnInitialize}, }; @@ -27,18 +29,23 @@ pub const INVALID_SCHEMA_ID: SchemaId = 65534; // this value should be the same as the one used in benchmarking pub const IPFS_SCHEMA_ID: SchemaId = 20; +pub const ON_CHAIN_SCHEMA_ID: SchemaId = 16001; + pub const IPFS_PAYLOAD_LENGTH: u32 = 1200; pub const DUMMY_CID_BASE32: &[u8; 59] = b"bafkreieb2x7yyuhy6hmct4j7tkmgnthrfpqyo4mt5nscx7pvc6oiweiwjq"; pub const DUMMY_CID_BASE64: &[u8; 49] = b"mAVUSIIHV/4xQ+PHYKfE/mphmzPEr4Ydxk+tkK/31F5yLERZM"; +pub const DUMMY_MSA_ID: u64 = 10; + // Configure a mock runtime to test the pallet. frame_support::construct_runtime!( pub enum Test { System: frame_system::{Pallet, Call, Config, Storage, Event}, MessagesPallet: pallet_messages::{Pallet, Call, Storage, Event}, + Migrator: pallet_migrations, } ); @@ -68,25 +75,34 @@ impl system::Config for Test { type OnSetCode = (); type MaxConsumers = ConstU32<16>; type SingleBlockMigrations = (); - type MultiBlockMigrator = (); + type MultiBlockMigrator = Migrator; type PreInherents = (); type PostInherents = (); type PostTransactions = (); type ExtensionsWeightInfo = (); } +frame_support::parameter_types! { + pub storage MigratorServiceWeight: Weight = Weight::from_parts(100, 100); // do not use in prod +} + +#[derive_impl(pallet_migrations::config_preludes::TestDefaultConfig)] +impl pallet_migrations::Config for Test { + #[cfg(not(feature = "runtime-benchmarks"))] + type Migrations = ( + crate::migration::v3::MigrateV2ToV3>, + crate::migration::v3::FinalizeV3Migration>, + ); + #[cfg(feature = "runtime-benchmarks")] + type Migrations = pallet_migrations::mock_helpers::MockedMigrations; + type MaxServiceWeight = MigratorServiceWeight; +} + pub type MaxSchemaGrantsPerDelegation = ConstU32<30>; // Needs parameter_types! for the impls below parameter_types! { - // Max payload size was picked specifically to be large enough to accommodate - // a CIDv1 using SHA2-256, but too small to accommodate CIDv1 w/SHA2-512. - // This is purely so that we can test the error condition. Real world configuration - // should have this set large enough to accommodate the largest possible CID. - // Take care when adding new tests for on-chain (not IPFS) messages that the payload - // is not too big. - pub const MessagesMaxPayloadSizeBytes: u32 = 73; - + pub const MessagesMaxPayloadSizeBytes: u32 = 1024 * 3; } impl std::fmt::Debug for MessagesMaxPayloadSizeBytes { @@ -276,6 +292,17 @@ pub fn run_to_block(n: u32) { } } +#[allow(dead_code)] +pub fn run_to_block_with_migrations(n: u32) { + System::run_to_block_with::( + n, + frame_system::RunToBlockHooks::default().after_initialize(|_| { + // Done by Executive: + ::MultiBlockMigrator::step(); + }), + ); +} + pub fn get_msa_from_account(account_id: u64) -> u64 { account_id + 100 } diff --git a/pallets/messages/src/tests/other_tests.rs b/pallets/messages/src/tests/other_tests.rs index 9a7ce42e05..1fa3331068 100644 --- a/pallets/messages/src/tests/other_tests.rs +++ b/pallets/messages/src/tests/other_tests.rs @@ -1,49 +1,42 @@ -use crate::{tests::mock::*, BlockMessageIndex, Error, Event as MessageEvent, Message, MessagesV2}; -use common_primitives::{messages::MessageResponse, schema::*}; +extern crate alloc; +use crate::{ + pallet::{Config, MessagesV3}, + tests::mock::*, + BlockMessageIndex, Error, Event as MessageEvent, MapToResponse, Message, +}; +use alloc::vec::Vec; +use common_primitives::{messages::MessageResponseV2, schema::*}; use frame_support::{assert_err, assert_noop, assert_ok, traits::OnInitialize, BoundedVec}; use frame_system::{EventRecord, Phase}; use multibase::Base; use parity_scale_codec::Encode; #[allow(unused_imports)] use pretty_assertions::{assert_eq, assert_ne, assert_str_eq}; -use rand::Rng; -use serde::Serialize; use sp_core::ConstU32; -extern crate alloc; -use alloc::vec::Vec; - -#[derive(Serialize)] -#[allow(non_snake_case)] -struct Payload { - // Normally would be u64, but we keep it to u8 in order to keep payload size down in tests. - fromId: u8, - content: String, -} pub const DUMMY_CID_SHA512: &str = "bafkrgqb76pscorjihsk77zpyst3p364zlti6aojlu4nga34vhp7t5orzwbwwytvp7ej44r5yhjzneanqwb5arcnvuvfwo2d4qgzyx5hymvto4"; +pub const DUMMY_CID_SHA256: &str = "bagaaierasords4njcts6vs7qvdjfcvgnume4hqohf65zsfguprqphs3icwea"; +pub const DUMMY_CID_BLAKE3: &str = "bafkr4ihn4xalcdzoyslzy2nvf5q6il7vwqjvdhhatpqpctijrxh6l5xzru"; /// Populate mocked Messages storage with message data. /// /// # Arguments -/// * `schema_id` - Registered schema id to which stored messages should adhere +/// * `intent_id` - Registered intent id with which to associate messages /// * `message_per_block` - A signed transaction origin from the provider /// * `payload_location` - Determines how a message payload is encoded. PayloadLocation::IPFS /// will encode (mock CID, IPFS_PAYLOAD_LENGTH) on the message payload. -fn populate_messages( - schema_id: SchemaId, +fn populate_messages_v3( + intent_id: IntentId, message_per_block: Vec, payload_location: PayloadLocation, cid_in: Option<&[u8]>, ) { - let cid = match cid_in { - Some(val) => val, - None => &DUMMY_CID_BASE32[..], - }; + let cid = cid_in.unwrap_or_else(|| &DUMMY_CID_BASE32[..]); let payload = match payload_location { // Just stick Itemized & Paginated here for coverage; we don't use them for Messages PayloadLocation::OnChain | PayloadLocation::Itemized | PayloadLocation::Paginated => - generate_payload(1, None), + generate_payload(1), PayloadLocation::IPFS => (multibase::decode(core::str::from_utf8(cid).unwrap()).unwrap().1, IPFS_PAYLOAD_LENGTH) .encode(), @@ -52,10 +45,11 @@ fn populate_messages( let mut counter = 0; for (idx, count) in message_per_block.iter().enumerate() { for _ in 0..*count { - MessagesV2::::set( - (idx as u32, schema_id, counter), + MessagesV3::::set( + (idx as u32, intent_id, counter), Some(Message { - msa_id: Some(10), + schema_id: intent_id as SchemaId, + msa_id: Some(DUMMY_MSA_ID), payload: payload.clone().try_into().unwrap(), provider_msa_id: 1, }), @@ -68,24 +62,9 @@ fn populate_messages( /// Helper function to generate message payloads /// /// # Arguments -/// * `num_items` - Number of message to include in payload -/// * `content_len` - Length of content string to generate -fn generate_payload(num_items: u8, content_len: Option) -> Vec { - let mut result_str = String::new(); - let size = content_len.unwrap_or(3); - let mut rng = rand::rng(); - - for _ in 0..num_items { - let payload = serde_json::to_string(&Payload { - fromId: rng.random(), - content: (0..size).map(|_| "X").collect::(), - }) - .unwrap(); - - result_str.push_str(payload.as_str()); - } - - result_str.as_bytes().to_vec() +/// * `content_len` - Length of payload +fn generate_payload(content_len: u32) -> Vec { + vec![1u8; content_len as usize].to_vec() } #[test] @@ -96,9 +75,9 @@ fn add_message_should_store_message_in_storage() { let caller_2 = 2; let schema_id_1: SchemaId = 1; let schema_id_2: SchemaId = 2; - let message_payload_1 = generate_payload(1, None); - let message_payload_2 = generate_payload(1, None); - let message_payload_3 = generate_payload(1, None); + let message_payload_1 = generate_payload(1); + let message_payload_2 = generate_payload(1); + let message_payload_3 = generate_payload(1); // act assert_ok!(MessagesPallet::add_onchain_message( @@ -123,13 +102,14 @@ fn add_message_should_store_message_in_storage() { )); // assert messages - let msg1 = MessagesV2::::get((1, schema_id_1, 0u16)); - let msg2 = MessagesV2::::get((1, schema_id_2, 1u16)); - let msg3 = MessagesV2::::get((1, schema_id_2, 2u16)); + let msg1 = MessagesV3::::get((1, schema_id_1, 0u16)); + let msg2 = MessagesV3::::get((1, schema_id_2, 1u16)); + let msg3 = MessagesV3::::get((1, schema_id_2, 2u16)); assert_eq!( msg1, Some(Message { + schema_id: schema_id_1, msa_id: Some(get_msa_from_account(caller_1)), payload: message_payload_1.try_into().unwrap(), provider_msa_id: get_msa_from_account(caller_1) @@ -139,6 +119,7 @@ fn add_message_should_store_message_in_storage() { assert_eq!( msg2, Some(Message { + schema_id: schema_id_2, msa_id: Some(get_msa_from_account(caller_2)), payload: message_payload_2.try_into().unwrap(), provider_msa_id: get_msa_from_account(caller_2) @@ -148,6 +129,7 @@ fn add_message_should_store_message_in_storage() { assert_eq!( msg3, Some(Message { + schema_id: schema_id_2, msa_id: Some(get_msa_from_account(caller_2)), payload: message_payload_3.try_into().unwrap(), provider_msa_id: get_msa_from_account(caller_2) @@ -172,15 +154,15 @@ fn add_message_with_too_large_message_should_panic() { new_test_ext().execute_with(|| { // arrange let caller_1 = 5; - let schema_id_1: SchemaId = 1; - let message_payload_1 = generate_payload(3, None); + let message_payload_1 = + generate_payload(::MessagesMaxPayloadSizeBytes::get() + 1); // act assert_noop!( MessagesPallet::add_onchain_message( RuntimeOrigin::signed(caller_1), None, - schema_id_1, + ON_CHAIN_SCHEMA_ID, message_payload_1 ), Error::::ExceedsMaxMessagePayloadSizeBytes @@ -194,7 +176,7 @@ fn add_message_with_invalid_msa_account_errors() { // arrange let caller_1 = 1000; let schema_id_1: SchemaId = 1; - let message_payload_1 = generate_payload(2, None); + let message_payload_1 = generate_payload(2); // act assert_noop!( @@ -231,60 +213,62 @@ fn add_ipfs_message_with_invalid_msa_account_errors() { /// Assert that MessageResponse for IPFS messages returns the payload_length of the offchain message. #[test] -fn get_messages_by_schema_with_ipfs_payload_location_should_return_offchain_payload_length() { +fn get_messages_by_intent_with_ipfs_payload_location_should_return_offchain_payload_length() { new_test_ext().execute_with(|| { // Setup - let schema_id: SchemaId = IPFS_SCHEMA_ID; - let current_block = 1; + let intent_id = 1 as IntentId; + let current_block = 0; // Populate - populate_messages(schema_id, vec![1], PayloadLocation::IPFS, None); + populate_messages_v3(intent_id, vec![1], PayloadLocation::IPFS, None); // Run to the block + run_to_block(current_block + 1); let list = - MessagesPallet::get_messages_by_schema_and_block(schema_id, PayloadLocation::IPFS, 0); + MessagesPallet::get_messages_by_intent_and_block(intent_id, PayloadLocation::IPFS, 0); // IPFS messages should return the payload length that was encoded in a tuple along // with the CID: (cid, payload_length). assert_eq!(list.len(), 1); assert_eq!( list[0], - MessageResponse { + MessageResponseV2 { + schema_id: intent_id, payload: None, index: 0, provider_msa_id: 1, block_number: 0, payload_length: Some(IPFS_PAYLOAD_LENGTH), - msa_id: None, - cid: Some(DUMMY_CID_BASE32.to_vec()) + msa_id: Some(DUMMY_MSA_ID), + cid: Some(DUMMY_CID_BASE32.to_vec()), + ..Default::default() } ); }); } #[test] -fn retrieved_ipfs_message_should_always_be_in_base32() { +fn retrieved_ipfs_message_cid_should_always_be_in_base32() { new_test_ext().execute_with(|| { - let schema_id = IPFS_SCHEMA_ID; + let intent_id = 1 as IntentId; let current_block: u32 = 1; // Populate message storage using Base64-encoded CID - populate_messages(schema_id, vec![1], PayloadLocation::IPFS, Some(DUMMY_CID_BASE64)); + populate_messages_v3(intent_id, vec![1], PayloadLocation::IPFS, Some(DUMMY_CID_BASE64)); // Run to the block run_to_block(current_block + 1); let list = - MessagesPallet::get_messages_by_schema_and_block(schema_id, PayloadLocation::IPFS, 0); + MessagesPallet::get_messages_by_intent_and_block(intent_id, PayloadLocation::IPFS, 0); assert_eq!(list[0].cid.as_ref().unwrap(), &DUMMY_CID_BASE32.to_vec()); }) } #[test] -fn get_messages_by_schema_with_ipfs_payload_location_should_fail_bad_schema() { +fn decode_message_with_invalid_payload_should_yield_empty_cid() { new_test_ext().execute_with(|| { let bad_message: Message = Message { payload: BoundedVec::try_from( @@ -293,8 +277,10 @@ fn get_messages_by_schema_with_ipfs_payload_location_should_fail_bad_schema() { .unwrap(), msa_id: Some(0), provider_msa_id: 1, + schema_id: 1, }; - let mapped_response = bad_message.map_to_response(0, PayloadLocation::IPFS, 0); + let mapped_response: MessageResponseV2 = + bad_message.map_to_response((0, PayloadLocation::IPFS, 0)).unwrap(); assert_eq!( mapped_response.cid, Some(multibase::encode(Base::Base32Lower, Vec::new()).as_bytes().to_vec()) @@ -309,7 +295,7 @@ fn add_message_via_non_delegate_should_fail() { let message_producer = 1; let message_provider = 2000; let schema_id_1: SchemaId = 1; - let message_payload_1 = generate_payload(1, None); + let message_payload_1 = generate_payload(1); // act assert_err!( MessagesPallet::add_onchain_message( @@ -322,18 +308,18 @@ fn add_message_via_non_delegate_should_fail() { ); // assert - let msg = MessagesV2::::get((1, schema_id_1, 0)); + let msg = MessagesV3::::get((1, schema_id_1, 0)); assert_eq!(msg, None); }); } #[test] -fn add_message_with_invalid_schema_id_should_error() { +fn add_onchain_message_with_invalid_schema_id_should_error() { new_test_ext().execute_with(|| { // arrange let caller_1 = 5; let schema_id_1: SchemaId = INVALID_SCHEMA_ID; - let message_payload_1 = generate_payload(2, None); + let message_payload_1 = generate_payload(2); // act assert_err!( @@ -383,7 +369,7 @@ fn valid_payload_location_ipfs() { } #[test] -fn invalid_payload_location_ipfs() { +fn add_ipfs_message_with_invalid_payload_location_should_fail() { new_test_ext().execute_with(|| { let caller_1 = 5; let schema_id_1: SchemaId = 1; @@ -401,10 +387,10 @@ fn invalid_payload_location_ipfs() { } #[test] -fn invalid_payload_location_onchain() { +fn add_onchain_message_with_invalid_payload_location_should_fail() { new_test_ext().execute_with(|| { let caller_1 = 5; - let payload = generate_payload(1, None); + let payload = generate_payload(1); assert_noop!( MessagesPallet::add_onchain_message( @@ -419,7 +405,7 @@ fn invalid_payload_location_onchain() { } #[test] -fn add_ipfs_message_with_large_payload_errors() { +fn add_ipfs_message_with_unsupported_cid_hash_should_fail() { new_test_ext().execute_with(|| { let caller_1 = 5u64; @@ -427,15 +413,41 @@ fn add_ipfs_message_with_large_payload_errors() { MessagesPallet::add_ipfs_message( RuntimeOrigin::signed(caller_1), IPFS_SCHEMA_ID, - // We've deliberately mocked MaxMessagePayloadSizeBytes to be too small to contain a CIDv1 with a SHA2-512 hash. DUMMY_CID_SHA512.as_bytes().to_vec(), 15 ), - Error::::ExceedsMaxMessagePayloadSizeBytes + Error::::InvalidCid ); }) } +#[test] +fn add_ipfs_message_with_sha2_256_cid_should_succeed() { + new_test_ext().execute_with(|| { + let caller_1 = 5u64; + + assert_ok!(MessagesPallet::add_ipfs_message( + RuntimeOrigin::signed(caller_1), + IPFS_SCHEMA_ID, + DUMMY_CID_SHA256.as_bytes().to_vec(), + 15 + )); + }) +} + +#[test] +fn add_ipfs_message_with_blake3_cid_should_succeed() { + new_test_ext().execute_with(|| { + let caller_1 = 5u64; + + assert_ok!(MessagesPallet::add_ipfs_message( + RuntimeOrigin::signed(caller_1), + IPFS_SCHEMA_ID, + DUMMY_CID_BLAKE3.as_bytes().to_vec(), + 15 + )); + }) +} #[test] fn add_ipfs_message_cid_v0_errors() { new_test_ext().execute_with(|| { @@ -479,8 +491,8 @@ fn on_initialize_should_clean_up_temporary_storage() { let caller_2 = 2; let schema_id_1: SchemaId = 1; let schema_id_2: SchemaId = 2; - let message_payload_1 = generate_payload(1, None); - let message_payload_2 = generate_payload(1, None); + let message_payload_1 = generate_payload(1); + let message_payload_2 = generate_payload(1); assert_ok!(MessagesPallet::add_onchain_message( RuntimeOrigin::signed(caller_1), None, @@ -502,109 +514,27 @@ fn on_initialize_should_clean_up_temporary_storage() { }); } -#[test] -fn validate_cid_invalid_utf8_errors() { - new_test_ext().execute_with(|| { - let bad_cid = vec![0xfc, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_too_short_errors() { - new_test_ext().execute_with(|| { - let bad_cid = "a".as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_v0_errors() { - new_test_ext().execute_with(|| { - let bad_cid = "Qmxxx".as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::UnsupportedCidVersion); - }) -} - -#[test] -fn validate_cid_invalid_multibase_errors() { - new_test_ext().execute_with(|| { - let bad_cid = "aaaa".as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_invalid_cid_errors() { - new_test_ext().execute_with(|| { - let bad_cid = multibase::encode(Base::Base32Lower, "foo").as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_valid_cid_succeeds() { - new_test_ext().execute_with(|| { - let bad_cid = DUMMY_CID_BASE32.to_vec(); - - assert_ok!(MessagesPallet::validate_cid(&bad_cid)); - }) -} - -#[test] -fn validate_cid_not_utf8_aligned_errors() { - new_test_ext().execute_with(|| { - // This should not panic, but should return an error. - let bad_cid = vec![55, 197, 136, 0, 0, 0, 0, 0, 0, 0, 0]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_not_correct_format_errors() { - new_test_ext().execute_with(|| { - // This should not panic, but should return an error. - let bad_cid = vec![0, 1, 0, 1, 203, 155, 0, 0, 0, 5, 67]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - // This should not panic, but should return an error. - let another_bad_cid = vec![241, 0, 0, 0, 0, 0, 128, 132, 132, 132, 58]; - - assert_noop!(MessagesPallet::validate_cid(&another_bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_unwrap_panics() { - new_test_ext().execute_with(|| { - // This should not panic, but should return an error. - let bad_cid = vec![102, 70, 70, 70, 70, 70, 70, 70, 70, 48, 48, 48, 54, 53, 53, 48, 48]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - #[test] fn map_to_response_on_chain() { let payload_vec = b"123456789012345678901234567890".to_vec(); let payload_bounded = BoundedVec::>::try_from(payload_vec.clone()).unwrap(); - let msg = Message { payload: payload_bounded, provider_msa_id: 10u64, msa_id: None }; - let expected = MessageResponse { - provider_msa_id: 10u64, + let msg = Message { + payload: payload_bounded, + provider_msa_id: DUMMY_MSA_ID, + msa_id: None, + schema_id: 1, + }; + let expected = MessageResponseV2 { + provider_msa_id: DUMMY_MSA_ID, index: 1u16, block_number: 42, msa_id: None, payload: Some(payload_vec), cid: None, payload_length: None, + schema_id: 1, }; - assert_eq!(msg.map_to_response(42, PayloadLocation::OnChain, 1), expected); + assert_eq!(msg.map_to_response((42, 1, PayloadLocation::OnChain, 1)).unwrap(), expected); } #[test] @@ -612,15 +542,16 @@ fn map_to_response_ipfs() { let cid = DUMMY_CID_SHA512; let payload_tuple: crate::OffchainPayloadType = (multibase::decode(cid).unwrap().1, 10); let payload = BoundedVec::>::try_from(payload_tuple.encode()).unwrap(); - let msg = Message { payload, provider_msa_id: 10u64, msa_id: None }; - let expected = MessageResponse { - provider_msa_id: 10u64, + let msg = Message { payload, provider_msa_id: 10u64, msa_id: None, schema_id: 1 }; + let expected = MessageResponseV2 { + provider_msa_id: DUMMY_MSA_ID, index: 1u16, block_number: 42, msa_id: None, payload: None, cid: Some(cid.as_bytes().to_vec()), payload_length: Some(10), + schema_id: 1, }; - assert_eq!(msg.map_to_response(42, PayloadLocation::IPFS, 1), expected); + assert_eq!(msg.map_to_response((42, 1, PayloadLocation::IPFS, 1)).unwrap(), expected); } diff --git a/pallets/messages/src/types.rs b/pallets/messages/src/types.rs index 2d92af899d..96e42674ce 100644 --- a/pallets/messages/src/types.rs +++ b/pallets/messages/src/types.rs @@ -1,5 +1,8 @@ use common_primitives::{ - messages::MessageResponse, msa::MessageSourceId, node::BlockNumber, schema::PayloadLocation, + messages::MessageResponse, + msa::MessageSourceId, + node::BlockNumber, + schema::{PayloadLocation, SchemaId}, }; use core::fmt::Debug; use frame_support::{traits::Get, BoundedVec}; @@ -8,6 +11,12 @@ use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use scale_info::TypeInfo; extern crate alloc; use alloc::vec::Vec; +use common_primitives::messages::MessageResponseV2; + +/// Multihash hash function code for SHA2-256 +pub const SHA2_256: u64 = 0x12; +/// Multihash hash function code for Blake3 +pub const BLAKE3: u64 = 0x1e; /// Payloads stored offchain contain a tuple of (bytes(the payload reference), payload length). pub type OffchainPayloadType = (Vec, u32); @@ -31,51 +40,131 @@ where pub provider_msa_id: MessageSourceId, /// Message source account id (the original source). pub msa_id: Option, + /// The SchemaId of the schema that defines the format of the payload + pub schema_id: SchemaId, } -impl Message -where - MaxDataSize: Get + Debug, +/// Trait for converting message storage to response type +pub trait MapToResponse { + /// Maps a stored message to an RPC response + fn map_to_response(&self, index_values: I) -> Option; +} + +impl + Debug> + MapToResponse<(BlockNumber, PayloadLocation, u16), MessageResponse> for Message +{ + fn map_to_response( + &self, + index_values: (BlockNumber, PayloadLocation, u16), + ) -> Option { + let (block_number, payload_location, index) = index_values; + let base_response = MessageResponse { + provider_msa_id: self.provider_msa_id, + index, + block_number, + msa_id: self.msa_id, + ..Default::default() + }; + + match payload_location { + PayloadLocation::OnChain => Some(MessageResponse { + payload: Some(self.payload.to_vec()), + cid: None, + payload_length: None, + ..base_response + }), + PayloadLocation::IPFS => { + let (binary_cid, payload_length) = + OffchainPayloadType::decode(&mut &self.payload[..]).unwrap_or_default(); + Some(MessageResponse { + cid: Some(multibase::encode(Base::Base32Lower, binary_cid).as_bytes().to_vec()), + payload_length: Some(payload_length), + payload: None, + ..base_response + }) + }, // Message types of Itemized and Paginated are retrieved differently + _ => None, + } + } +} + +impl + Debug> + MapToResponse<(BlockNumber, SchemaId, PayloadLocation, u16), MessageResponseV2> + for Message { - /// Helper function to handle response type [`MessageResponse`] depending on the Payload Location (on chain or IPFS) - pub fn map_to_response( + /// Helper function to handle response type [`MessageResponseV2`] depending on the Payload Location (on chain or IPFS) + fn map_to_response( &self, - block_number: BlockNumber, - payload_location: PayloadLocation, - index: u16, - ) -> MessageResponse { + index_values: (BlockNumber, SchemaId, PayloadLocation, u16), + ) -> Option { + let (block_number, schema_id, payload_location, index) = index_values; + let base_response = MessageResponseV2 { + provider_msa_id: self.provider_msa_id, + index, + block_number, + msa_id: self.msa_id, + schema_id, + ..Default::default() + }; + match payload_location { - PayloadLocation::OnChain => MessageResponse { - provider_msa_id: self.provider_msa_id, - index, - block_number, - msa_id: self.msa_id, + PayloadLocation::OnChain => Some(MessageResponseV2 { payload: Some(self.payload.to_vec()), cid: None, payload_length: None, - }, + ..base_response + }), PayloadLocation::IPFS => { let (binary_cid, payload_length) = OffchainPayloadType::decode(&mut &self.payload[..]).unwrap_or_default(); - MessageResponse { - provider_msa_id: self.provider_msa_id, - index, - block_number, + Some(MessageResponseV2 { cid: Some(multibase::encode(Base::Base32Lower, binary_cid).as_bytes().to_vec()), payload_length: Some(payload_length), - msa_id: None, payload: None, - } + ..base_response + }) }, // Message types of Itemized and Paginated are retrieved differently - _ => MessageResponse { - provider_msa_id: self.provider_msa_id, - index, - block_number, - msa_id: None, - payload: None, + _ => None, + } + } +} + +impl + Debug> + MapToResponse<(BlockNumber, PayloadLocation, u16), MessageResponseV2> for Message +{ + /// Helper function to handle response type [`MessageResponseV2`] depending on the Payload Location (on chain or IPFS) + fn map_to_response( + &self, + index_values: (BlockNumber, PayloadLocation, u16), + ) -> Option { + let (block_number, payload_location, index) = index_values; + let base_response = MessageResponseV2 { + provider_msa_id: self.provider_msa_id, + index, + block_number, + msa_id: self.msa_id, + schema_id: self.schema_id, + ..Default::default() + }; + + match payload_location { + PayloadLocation::OnChain => Some(MessageResponseV2 { + payload: Some(self.payload.to_vec()), cid: None, payload_length: None, - }, + ..base_response + }), + PayloadLocation::IPFS => { + let (binary_cid, payload_length) = + OffchainPayloadType::decode(&mut &self.payload[..]).unwrap_or_default(); + Some(MessageResponseV2 { + cid: Some(multibase::encode(Base::Base32Lower, binary_cid).as_bytes().to_vec()), + payload_length: Some(payload_length), + payload: None, + ..base_response + }) + }, // Message types of Itemized and Paginated are retrieved differently + _ => None, } } } diff --git a/pallets/messages/src/weights.rs b/pallets/messages/src/weights.rs index ce9a7e2604..f0a1e01ea4 100644 --- a/pallets/messages/src/weights.rs +++ b/pallets/messages/src/weights.rs @@ -35,6 +35,8 @@ use core::marker::PhantomData; pub trait WeightInfo { fn add_onchain_message(n: u32, ) -> Weight; fn add_ipfs_message() -> Weight; + fn v2_to_v3_step() -> Weight; + fn v2_to_v3_final_step() -> Weight; } /// Weights for `pallet_messages` using the Substrate node and recommended hardware. @@ -75,6 +77,29 @@ impl WeightInfo for SubstrateWeight { .saturating_add(T::DbWeight::get().reads(2_u64)) .saturating_add(T::DbWeight::get().writes(1_u64)) } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Storage: `Messages::MessagesV3` (r:0 w:1) + /// Proof: `Messages::MessagesV3` (`max_values`: None, `max_size`: Some(3125), added: 5600, mode: `MaxEncodedLen`) + fn v2_to_v3_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `3113` + // Estimated: `9548` + // Minimum execution time: 13_000_000 picoseconds. + Weight::from_parts(14_000_000, 9548) + .saturating_add(T::DbWeight::get().reads(2_u64)) + .saturating_add(T::DbWeight::get().writes(2_u64)) + } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + fn v2_to_v3_final_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `0` + // Estimated: `0` + // Minimum execution time: 1_000_000 picoseconds. + Weight::from_parts(2_000_000, 0) + .saturating_add(T::DbWeight::get().writes(1_u64)) + } } // For backwards compatibility and tests. @@ -114,6 +139,29 @@ impl WeightInfo for () { .saturating_add(RocksDbWeight::get().reads(2_u64)) .saturating_add(RocksDbWeight::get().writes(1_u64)) } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Storage: `Messages::MessagesV3` (r:0 w:1) + /// Proof: `Messages::MessagesV3` (`max_values`: None, `max_size`: Some(3125), added: 5600, mode: `MaxEncodedLen`) + fn v2_to_v3_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `3113` + // Estimated: `9548` + // Minimum execution time: 13_000_000 picoseconds. + Weight::from_parts(14_000_000, 9548) + .saturating_add(RocksDbWeight::get().reads(2_u64)) + .saturating_add(RocksDbWeight::get().writes(2_u64)) + } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + fn v2_to_v3_final_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `0` + // Estimated: `0` + // Minimum execution time: 1_000_000 picoseconds. + Weight::from_parts(2_000_000, 0) + .saturating_add(RocksDbWeight::get().writes(1_u64)) + } } diff --git a/runtime/frequency/Cargo.toml b/runtime/frequency/Cargo.toml index 509a646eef..bc38979898 100644 --- a/runtime/frequency/Cargo.toml +++ b/runtime/frequency/Cargo.toml @@ -39,6 +39,7 @@ pallet-democracy = { workspace = true } pallet-collective = { workspace = true } pallet-session = { workspace = true } pallet-sudo = { workspace = true } +pallet-migrations = { workspace = true } pallet-multisig = { workspace = true } pallet-timestamp = { workspace = true } pallet-transaction-payment = { workspace = true } @@ -142,6 +143,7 @@ std = [ "pallet-passkey/std", "pallet-messages-runtime-api/std", "pallet-messages/std", + "pallet-migrations/std", "pallet-msa-runtime-api/std", "pallet-msa/std", "pallet-multisig/std", @@ -212,6 +214,7 @@ runtime-benchmarks = [ "pallet-handles/runtime-benchmarks", "pallet-passkey/runtime-benchmarks", "pallet-messages/runtime-benchmarks", + "pallet-migrations/runtime-benchmarks", "pallet-msa/runtime-benchmarks", "pallet-multisig/runtime-benchmarks", "pallet-preimage/runtime-benchmarks", @@ -254,6 +257,7 @@ try-runtime = [ "pallet-handles/try-runtime", "pallet-passkey/try-runtime", "pallet-messages/try-runtime", + "pallet-migrations/try-runtime", "pallet-msa/try-runtime", "pallet-multisig/try-runtime", "pallet-preimage/try-runtime", diff --git a/runtime/frequency/src/lib.rs b/runtime/frequency/src/lib.rs index 7723ea8bb9..df4dc3bf0b 100644 --- a/runtime/frequency/src/lib.rs +++ b/runtime/frequency/src/lib.rs @@ -142,7 +142,7 @@ use frame_system::{ }; use alloc::{boxed::Box, vec, vec::Vec}; - +use core::ops::ControlFlow; pub use sp_consensus_aura::sr25519::AuthorityId as AuraId; pub use sp_runtime::Perbill; @@ -159,7 +159,11 @@ pub use pallet_time_release::types::{ScheduleName, SchedulerProviderTrait}; // Polkadot Imports use polkadot_runtime_common::{BlockHashCount, SlowAdjustingFeeUpdate}; -use common_primitives::{capacity::UnclaimedRewardInfo, schema::*}; +use common_primitives::{ + capacity::UnclaimedRewardInfo, + messages::{BlockPaginationRequest, BlockPaginationResponse, MessageResponseV2}, + schema::*, +}; use common_runtime::weights::rocksdb_weights::constants::RocksDbWeight; pub use common_runtime::{ constants::MaxSchemaGrants, @@ -781,7 +785,7 @@ impl frame_system::Config for Runtime { /// A new way of configuring migrations that run in a single block. type SingleBlockMigrations = (); /// The migrator that is used to run Multi-Block-Migrations. - type MultiBlockMigrator = (); + type MultiBlockMigrator = MultiBlockMigrations; /// A callback that executes in *every block* directly before all inherents were applied. type PreInherents = (); /// A callback that executes in *every block* directly after all inherents were applied. @@ -1643,6 +1647,34 @@ impl pallet_utility::Config for Runtime { type WeightInfo = weights::pallet_utility::SubstrateWeight; } +parameter_types! { + pub MbmServiceWeight: Weight = Perbill::from_percent(80) * RuntimeBlockWeights::get().max_block; +} + +impl pallet_migrations::Config for Runtime { + type RuntimeEvent = RuntimeEvent; + #[cfg(not(feature = "runtime-benchmarks"))] + type Migrations = ( + pallet_messages::migration::MigrateV2ToV3< + Runtime, + pallet_messages::weights::SubstrateWeight, + >, + pallet_messages::migration::FinalizeV3Migration< + Runtime, + pallet_messages::weights::SubstrateWeight, + >, + ); + // Benchmarks need mocked migrations to guarantee that they succeed. + #[cfg(feature = "runtime-benchmarks")] + type Migrations = pallet_migrations::mock_helpers::MockedMigrations; + type CursorMaxLen = ConstU32<65_536>; + type IdentifierMaxLen = ConstU32<256>; + type MigrationStatusHandler = (); + type FailedMigrationHandler = frame_support::migrations::FreezeChainOnFailedMigration; + type MaxServiceWeight = MbmServiceWeight; + type WeightInfo = pallet_migrations::weights::SubstrateWeight; +} + // Create the runtime by composing the FRAME pallets that were previously configured. construct_runtime!( pub enum Runtime { @@ -1696,6 +1728,9 @@ construct_runtime!( // Substrate weights WeightReclaim: cumulus_pallet_weight_reclaim::{Pallet, Storage} = 50, + // Multi-block migrations + MultiBlockMigrations: pallet_migrations::{Pallet, Event} = 51, + // Frequency related pallets Msa: pallet_msa::{Pallet, Call, Storage, Event} = 60, Messages: pallet_messages::{Pallet, Call, Storage, Event} = 61, @@ -1746,6 +1781,7 @@ mod benches { [pallet_transaction_payment, TransactionPayment] [cumulus_pallet_xcmp_queue, XcmpQueue] [pallet_message_queue, MessageQueue] + [pallet_migrations, MultiBlockMigrations] // Frequency [pallet_msa, Msa] @@ -1972,14 +2008,72 @@ sp_api::impl_runtime_apis! { } // Frequency runtime APIs + #[api_version(2)] impl pallet_messages_runtime_api::MessagesRuntimeApi for Runtime { fn get_messages_by_schema_and_block(schema_id: SchemaId, schema_payload_location: PayloadLocation, block_number: BlockNumber,) -> Vec { - Messages::get_messages_by_schema_and_block(schema_id, schema_payload_location, block_number) + match Schemas::get_schema_by_id(schema_id) { + Some(SchemaResponseV2 { intent_id, .. }) => Messages::get_messages_by_intent_and_block( + intent_id, + schema_payload_location, + block_number, + ).into_iter().map(|r| r.into()).collect(), + _ => vec![], + } } - fn get_schema_by_id(schema_id: SchemaId) -> Option { - Schemas::get_schema_by_id(schema_id) + /// Retrieve the messages for a particular intent and block range (paginated) + fn get_messages_by_intent_id(intent_id: IntentId, pagination: BlockPaginationRequest) -> BlockPaginationResponse { + let mut response = BlockPaginationResponse::new(); + + // Request Validation + if !pagination.validate() { + return response + } + + // Schema Fetch and Check + let intent = match Schemas::get_intent_by_id(intent_id, false) { + Some(intent) => intent, + None => return response, + }; + + let mut from_index: u32 = pagination.from_index; + + (pagination.from_block..pagination.to_block).try_for_each(|block_number| { + let list: Vec = Messages::get_messages_by_intent_and_block( + intent_id, + intent.payload_location, + block_number, + ); + + // Max messages in a block are constrained to MessageIndex (u16) by the storage, + // so this is a safe type coercion. Just to be safe, we'll trap in in debug builds + let list_size: u32 = list.len() as u32; + debug_assert!(list_size <= u16::MAX.into(), "unexpected number of messages in block"); + + let iter = list.into_iter().skip((from_index as usize).saturating_sub(1)); + // all subsequent blocks in this call should start at index 0 + from_index = 0; + iter.enumerate().try_for_each(|(i, m)| { + response.content.push(m); + + if response.check_end_condition_and_set_next_pagination( + block_number, + i as u32, + list_size, + &pagination, + ) { + return ControlFlow::Break(()) + } + + ControlFlow::Continue(()) + }) + }); + response + } + + fn get_schema_by_id(schema_id: SchemaId) -> Option { + Schemas::get_schema_by_id(schema_id).map(|r| r.into()) } } diff --git a/tools/state-copy/README.md b/tools/state-copy/README.md index 5d614ed03d..f233dd2ac3 100644 --- a/tools/state-copy/README.md +++ b/tools/state-copy/README.md @@ -8,8 +8,7 @@ To maintain alignment with Mainnet, when a new schema is deployed on Mainnet, Te 1. In the Frequency codebase: `cd tools/state-copy` 2. `npm i` -3. Edit `schemas.mjs` - - Use the Testnet `DEST_URL`: `const DEST_URL = "wss://0.rpc.testnet.amplica.io";` - - Update the`const SUDO_URI = "//Alice";` to be the SUDO key for Testnet +3. Set the following environment variables (or run with the `env` command): + - `DEST_URL=wss://0.rpc.testnet.amplica.io` + - `SUDO_URI=` 4. `npm run schemas` -5. Remove the changes diff --git a/tools/state-copy/schemas.mjs b/tools/state-copy/schemas.mjs index 2b26956943..dff1a1adc3 100644 --- a/tools/state-copy/schemas.mjs +++ b/tools/state-copy/schemas.mjs @@ -5,18 +5,20 @@ import { copy } from "./copy.mjs"; // Set up sudo account (assumes //Alice has sudo access) -const SUDO_URI = "//Alice"; +const SUDO_URI = process.env.SUDO_URI || "//Alice"; // Testnet // const DEST_URL = "wss://0.rpc.testnet.amplica.io"; // const SOURCE_URL = "wss://0.rpc.testnet.amplica.io"; // Localhost -const DEST_URL = "ws://localhost:9944"; +// const DEST_URL = "ws://localhost:9944"; + +const DEST_URL = process.env.DEST_URL || "ws://localhost:9944"; // Mainnet -const SOURCE_URL = "wss://1.rpc.frequency.xyz"; -const STORAGE_KEY = "0xeec6f3c13d26ae2507c99b6751e19e76"; +const SOURCE_URL = process.env.SOURCE_URL || "wss://1.rpc.frequency.xyz"; +const STORAGE_KEY = process.env.STORAGE_KEY || "0xeec6f3c13d26ae2507c99b6751e19e76"; const FILTER_OUT = [ "0xeec6f3c13d26ae2507c99b6751e19e76d5d9c370c6c8aee1116ee09d6811b0d5", // governanceSchemaModelMaxBytes "0xeec6f3c13d26ae2507c99b6751e19e764e7b9012096b41c4eb3aaf947f6ea429", // palletVersion