From a6d28b49253111ed8183c62c990c9093b6d384e3 Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Wed, 17 Sep 2025 16:56:12 -0400 Subject: [PATCH 1/7] first pass - update messages pallet for intent-based message storage --- common/primitives/src/messages.rs | 60 +++++++- designdocs/schemas_protocols_intents.md | 2 +- pallets/messages/src/benchmarking.rs | 20 ++- pallets/messages/src/lib.rs | 73 ++++++--- pallets/messages/src/rpc/src/lib.rs | 4 +- pallets/messages/src/runtime-api/src/lib.rs | 10 +- pallets/messages/src/tests/mock.rs | 2 + pallets/messages/src/tests/other_tests.rs | 110 ++++++++++---- pallets/messages/src/types.rs | 157 ++++++++++++++++---- 9 files changed, 356 insertions(+), 82 deletions(-) diff --git a/common/primitives/src/messages.rs b/common/primitives/src/messages.rs index 91c5b374e2..86ba839383 100644 --- a/common/primitives/src/messages.rs +++ b/common/primitives/src/messages.rs @@ -1,6 +1,6 @@ #[cfg(feature = "std")] use crate::utils; -use crate::{msa::MessageSourceId, node::BlockNumber}; +use crate::{msa::MessageSourceId, node::BlockNumber, schema::SchemaId}; use parity_scale_codec::{Decode, Encode}; use scale_info::TypeInfo; #[cfg(feature = "std")] @@ -11,7 +11,7 @@ use alloc::{vec, vec::Vec}; #[cfg(feature = "std")] use utils::*; -/// A type for responding with an single Message in an RPC-call dependent on schema model +/// A type for responding with a single Message in an RPC-call dependent on schema model /// IPFS, Parquet: { index, block_number, provider_msa_id, cid, payload_length } /// Avro, OnChain: { index, block_number, provider_msa_id, msa_id, payload } #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] @@ -20,7 +20,7 @@ pub struct MessageResponse { /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, /// indicating that the original source MSA is acting as its own provider. An id differing from that /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on - /// its behalf . + /// its behalf. pub provider_msa_id: MessageSourceId, /// Index in block to get total order. pub index: u16, @@ -45,6 +45,58 @@ pub struct MessageResponse { #[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))] pub payload_length: Option, } + +/// A type for responding with a single Message in an RPC-call dependent on schema model +/// IPFS, Parquet: { index, block_number, provider_msa_id, cid, payload_length } +/// Avro, OnChain: { index, block_number, provider_msa_id, msa_id, payload } +#[cfg_attr(feature = "std", derive(Serialize, Deserialize))] +#[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)] +pub struct MessageResponseV2 { + /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, + /// indicating that the original source MSA is acting as its own provider. An id differing from that + /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on + /// its behalf. + pub provider_msa_id: MessageSourceId, + /// Index in block to get total order. + pub index: u16, + /// Block-number for which the message was stored. + pub block_number: BlockNumber, + /// Message source account id (the original source). + #[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))] + pub msa_id: Option, + /// Serialized data in a the schemas. + #[cfg_attr( + feature = "std", + serde(with = "as_hex_option", skip_serializing_if = "Option::is_none", default) + )] + pub payload: Option>, + /// The content address for an IPFS payload in Base32. Will always be CIDv1. + #[cfg_attr( + feature = "std", + serde(with = "as_string_option", skip_serializing_if = "Option::is_none", default) + )] + pub cid: Option>, + /// Offchain payload length (IPFS). + #[cfg_attr(feature = "std", serde(skip_serializing_if = "Option::is_none", default))] + pub payload_length: Option, + /// The SchemaId of the schema that defines the payload format + pub schema_id: SchemaId, +} + +impl Into for MessageResponseV2 { + fn into(self) -> MessageResponse { + MessageResponse { + provider_msa_id: self.provider_msa_id, + index: self.index, + block_number: self.block_number, + msa_id: self.msa_id, + payload: self.payload, + cid: self.cid, + payload_length: self.payload_length, + } + } +} + /// A type for requesting paginated messages. #[cfg_attr(feature = "std", derive(Serialize, Deserialize))] #[derive(Default, Clone, Encode, Decode, PartialEq, Debug, TypeInfo, Eq)] @@ -93,7 +145,7 @@ pub struct BlockPaginationResponse { } impl BlockPaginationResponse { - /// Generates a new empty Pagination request + /// Generates a new empty Pagination response pub const fn new() -> BlockPaginationResponse { BlockPaginationResponse { content: vec![], diff --git a/designdocs/schemas_protocols_intents.md b/designdocs/schemas_protocols_intents.md index 680dd5dcde..d95a9cfa0b 100644 --- a/designdocs/schemas_protocols_intents.md +++ b/designdocs/schemas_protocols_intents.md @@ -175,7 +175,7 @@ it's difficult or impossible to bifurcate the storage in the same way as the `me requiring a complete storage migration, new pages/items that are written can include a _storage version magic number_ in either the page or the item header. For `Paginated` storage, this value would precede the `PageNonce`; for `Itemized` storage the value would precede `payload_len`. The 'magic number' would be designed to be the same byte length as the -value currently a byte offset zero within the page/item, and to be a value such that conflict with a valid `nonce` or +value currently at byte offset zero within the page/item, and to be a value such that conflict with a valid `nonce` or `payload_len` would be highly unlikely, if not impossible. New structures would be defined, ie `PageV2` and `ItemizedItemV2`, and decoding values read from storage would need to diff --git a/pallets/messages/src/benchmarking.rs b/pallets/messages/src/benchmarking.rs index 1e8ebd7d9e..4b99316cd7 100644 --- a/pallets/messages/src/benchmarking.rs +++ b/pallets/messages/src/benchmarking.rs @@ -33,6 +33,7 @@ fn onchain_message(schema_id: SchemaId) -> DispatchResult { provider_id.into(), Some(message_source_id.into()), bounded_payload, + schema_id, // intent_id schema_id, BlockNumberFor::::one(), )?; @@ -51,6 +52,7 @@ fn ipfs_message(schema_id: SchemaId) -> DispatchResult { provider_id.into(), None, bounded_payload, + schema_id, // intent_id schema_id, BlockNumberFor::::one(), )?; @@ -73,6 +75,10 @@ fn create_intent_and_schema(location: PayloadLocation) -> DispatchRes Ok(()) } +fn set_storage_v3_block(block_number: BlockNumberFor) { + StorageV3BlockNumber::::put(block_number); +} + #[benchmarks] mod benchmarks { use super::*; @@ -84,6 +90,9 @@ mod benchmarks { let message_source_id = DelegatorId(2); let caller: T::AccountId = whitelisted_caller(); let schema_id = ON_CHAIN_SCHEMA_ID; + let intent_id = ON_CHAIN_SCHEMA_ID as IntentId; + + set_storage_v3_block::(BlockNumberFor::::one()); // schema ids start from 1, and we need to add that many to make sure our desired id exists for _ in 0..=SCHEMA_SIZE { @@ -106,8 +115,8 @@ mod benchmarks { _(RawOrigin::Signed(caller), Some(message_source_id.into()), schema_id, payload); assert_eq!( - MessagesPallet::::get_messages_by_schema_and_block( - schema_id, + MessagesPallet::::get_messages_by_intent_and_block( + intent_id, PayloadLocation::OnChain, BlockNumberFor::::one() ) @@ -125,6 +134,9 @@ mod benchmarks { .as_bytes() .to_vec(); let schema_id = IPFS_SCHEMA_ID; + let intent_id = IPFS_SCHEMA_ID as IntentId; + + set_storage_v3_block::(BlockNumberFor::::one()); // schema ids start from 1, and we need to add that many to make sure our desired id exists for _ in 0..=SCHEMA_SIZE { @@ -139,8 +151,8 @@ mod benchmarks { _(RawOrigin::Signed(caller), schema_id, cid, IPFS_PAYLOAD_LENGTH); assert_eq!( - MessagesPallet::::get_messages_by_schema_and_block( - schema_id, + MessagesPallet::::get_messages_by_intent_and_block( + intent_id, PayloadLocation::IPFS, BlockNumberFor::::one() ) diff --git a/pallets/messages/src/lib.rs b/pallets/messages/src/lib.rs index df46c198b8..02f71e93cc 100644 --- a/pallets/messages/src/lib.rs +++ b/pallets/messages/src/lib.rs @@ -53,6 +53,7 @@ pub use types::*; pub use weights::*; use cid::Cid; +use common_primitives::node::BlockNumber; use frame_system::pallet_prelude::*; #[frame_support::pallet] @@ -61,7 +62,7 @@ pub mod pallet { use frame_support::pallet_prelude::*; /// The current storage version. - pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(2); + pub const STORAGE_VERSION: StorageVersion = StorageVersion::new(3); #[pallet::config] pub trait Config: frame_system::Config { @@ -103,6 +104,7 @@ pub mod pallet { #[pallet::whitelist_storage] pub(super) type BlockMessageIndex = StorageValue<_, MessageIndex, ValueQuery>; + /// Storage for messages in STORAGE_VERSION(2) and lower #[pallet::storage] pub(super) type MessagesV2 = StorageNMap< _, @@ -115,6 +117,24 @@ pub mod pallet { OptionQuery, >; + /// Storage for messages in STORAGE_VERSION(3) and higher + #[pallet::storage] + pub(super) type MessagesV3 = StorageNMap< + _, + ( + storage::Key>, + storage::Key, + storage::Key, + ), + MessageV3, + OptionQuery, + >; + + /// The block number at which the STORAGE_VERSION(3) migration was completed + #[pallet::storage] + pub(super) type StorageV3BlockNumber = + StorageValue<_, BlockNumberFor, ValueQuery>; + #[pallet::error] pub enum Error { /// Deprecated: Too many messages are added to existing block @@ -218,6 +238,7 @@ pub mod pallet { provider_msa_id, None, bounded_payload, + schema.intent_id, schema_id, current_block, )? { @@ -285,6 +306,7 @@ pub mod pallet { provider_msa_id, Some(maybe_delegator.into()), bounded_payload, + schema.intent_id, schema_id, current_block, )? { @@ -309,18 +331,20 @@ impl Pallet { provider_msa_id: MessageSourceId, msa_id: Option, payload: BoundedVec, + intent_id: IntentId, schema_id: SchemaId, current_block: BlockNumberFor, ) -> Result { let index = BlockMessageIndex::::get(); let first = index == 0; - let msg = Message { + let msg = MessageV3 { + schema_id, payload, // size is checked on top of extrinsic provider_msa_id, msa_id, }; - >::insert((current_block, schema_id, index), msg); + >::insert((current_block, intent_id, index), msg); BlockMessageIndex::::set(index.saturating_add(1)); Ok(first) } @@ -336,27 +360,42 @@ impl Pallet { .map_err(|_| Error::::InvalidMessageSourceAccount)?) } - /// Gets a messages for a given schema-id and block-number. + /// Gets messages for a given IntentId and block number. /// - /// Payload location is included to map to correct response (To avoid fetching the schema in this method) + /// Payload location is included to map to correct response (To avoid fetching the Intent in this method) /// - /// Result is a vector of [`MessageResponse`]. + /// Result is a vector of [`MessageResponseV2`]. /// - pub fn get_messages_by_schema_and_block( - schema_id: SchemaId, - schema_payload_location: PayloadLocation, + pub fn get_messages_by_intent_and_block( + intent_id: IntentId, + payload_location: PayloadLocation, block_number: BlockNumberFor, - ) -> Vec { - let block_number_value: u32 = block_number.try_into().unwrap_or_default(); + ) -> Vec { + let block_number_value: BlockNumber = block_number.try_into().unwrap_or_default(); - match schema_payload_location { + match payload_location { PayloadLocation::Itemized | PayloadLocation::Paginated => Vec::new(), _ => { - let mut messages: Vec<_> = >::iter_prefix((block_number, schema_id)) - .map(|(index, msg)| { - msg.map_to_response(block_number_value, schema_payload_location, index) - }) - .collect(); + let mut messages: Vec = + if StorageV3BlockNumber::::get().gt(&block_number) { + // Get message from pre-V3 storage. Pre-V3 intent ids equal schema ids. + MessagesV2::::iter_prefix((block_number, intent_id as SchemaId)) + .filter_map(|(index, msg)| { + msg.map_to_response(( + block_number_value, + intent_id as SchemaId, + payload_location, + index, + )) + }) + .collect() + } else { + MessagesV3::::iter_prefix((block_number, intent_id)) + .filter_map(|(index, msg)| { + msg.map_to_response((block_number_value, payload_location, index)) + }) + .collect() + }; messages.sort_by(|a, b| a.index.cmp(&b.index)); messages }, diff --git a/pallets/messages/src/rpc/src/lib.rs b/pallets/messages/src/rpc/src/lib.rs index 2bfa3004fa..b59a13f412 100644 --- a/pallets/messages/src/rpc/src/lib.rs +++ b/pallets/messages/src/rpc/src/lib.rs @@ -93,7 +93,8 @@ where let at = self.client.info().best_hash; // Schema Fetch and Check - let schema: SchemaResponseV2 = match api.get_schema_by_id(at, schema_id) { + #[allow(deprecated)] + let schema: SchemaResponse = match api.get_schema_by_id(at, schema_id) { Ok(Some(s)) => s, _ => fail!(MessageRpcError::InvalidSchemaId), }; @@ -104,6 +105,7 @@ where let mut from_index = pagination.from_index; 'loops: for block_number in from..to { + #[allow(deprecated)] let list: Vec = api .get_messages_by_schema_and_block( at, diff --git a/pallets/messages/src/runtime-api/src/lib.rs b/pallets/messages/src/runtime-api/src/lib.rs index e4e57ee7a9..f3a43cabee 100644 --- a/pallets/messages/src/runtime-api/src/lib.rs +++ b/pallets/messages/src/runtime-api/src/lib.rs @@ -34,10 +34,18 @@ sp_api::decl_runtime_apis! { pub trait MessagesRuntimeApi { /// Retrieve the messages for a particular schema and block number + // TODO: Remove once all RPC nodes have been updated to remove get_messages_by_schema_id RPC method + #[deprecated(note = "Please use get_messages_by_intent_and_block instead")] fn get_messages_by_schema_and_block(schema_id: SchemaId, schema_payload_location: PayloadLocation, block_number: BlockNumber) -> Vec; + /// Retrieve the messages for a particular intent and block range (paginated) + #[api_version(2)] + fn get_messages_by_intent_id(intent_id: IntentId, pagination: BlockPaginationRequest) -> Result, String>; + /// Retrieve a schema by id - fn get_schema_by_id(schema_id: SchemaId) -> Option; + // TODO: Remove once all RPC nodes have been updated to call the schemas pallet runtime for this + #[deprecated(note = "Use SchemasRuntimeApi_get_schema_by_id instead")] + fn get_schema_by_id(schema_id: SchemaId) -> Option; } } diff --git a/pallets/messages/src/tests/mock.rs b/pallets/messages/src/tests/mock.rs index 9f4245c306..6bb42cbcda 100644 --- a/pallets/messages/src/tests/mock.rs +++ b/pallets/messages/src/tests/mock.rs @@ -33,6 +33,8 @@ pub const DUMMY_CID_BASE32: &[u8; 59] = b"bafkreieb2x7yyuhy6hmct4j7tkmgnthrfpqyo4mt5nscx7pvc6oiweiwjq"; pub const DUMMY_CID_BASE64: &[u8; 49] = b"mAVUSIIHV/4xQ+PHYKfE/mphmzPEr4Ydxk+tkK/31F5yLERZM"; +pub const DUMMY_MSA_ID: u64 = 10; + // Configure a mock runtime to test the pallet. frame_support::construct_runtime!( pub enum Test diff --git a/pallets/messages/src/tests/other_tests.rs b/pallets/messages/src/tests/other_tests.rs index 9a7ce42e05..8ca35572cc 100644 --- a/pallets/messages/src/tests/other_tests.rs +++ b/pallets/messages/src/tests/other_tests.rs @@ -1,4 +1,4 @@ -use crate::{tests::mock::*, BlockMessageIndex, Error, Event as MessageEvent, Message, MessagesV2}; +use crate::{tests::mock::*, BlockMessageIndex, Error, Event as MessageEvent, MapToResponse, Message, MessageV3, MessagesV2}; use common_primitives::{messages::MessageResponse, schema::*}; use frame_support::{assert_err, assert_noop, assert_ok, traits::OnInitialize, BoundedVec}; use frame_system::{EventRecord, Phase}; @@ -11,6 +11,8 @@ use serde::Serialize; use sp_core::ConstU32; extern crate alloc; use alloc::vec::Vec; +use common_primitives::messages::MessageResponseV2; +use crate::pallet::{MessagesV3, StorageV3BlockNumber}; #[derive(Serialize)] #[allow(non_snake_case)] @@ -29,7 +31,7 @@ pub const DUMMY_CID_SHA512: &str = "bafkrgqb76pscorjihsk77zpyst3p364zlti6aojlu4n /// * `message_per_block` - A signed transaction origin from the provider /// * `payload_location` - Determines how a message payload is encoded. PayloadLocation::IPFS /// will encode (mock CID, IPFS_PAYLOAD_LENGTH) on the message payload. -fn populate_messages( +fn populate_messages_v2( schema_id: SchemaId, message_per_block: Vec, payload_location: PayloadLocation, @@ -55,7 +57,51 @@ fn populate_messages( MessagesV2::::set( (idx as u32, schema_id, counter), Some(Message { - msa_id: Some(10), + msa_id: Some(DUMMY_MSA_ID), + payload: payload.clone().try_into().unwrap(), + provider_msa_id: 1, + }), + ); + counter += 1; + } + } +} + +/// Populate mocked Messages storage with message data. +/// +/// # Arguments +/// * `intent_id` - Registered intent id with which to associate messages +/// * `message_per_block` - A signed transaction origin from the provider +/// * `payload_location` - Determines how a message payload is encoded. PayloadLocation::IPFS +/// will encode (mock CID, IPFS_PAYLOAD_LENGTH) on the message payload. +fn populate_messages_v3( + intent_id: IntentId, + message_per_block: Vec, + payload_location: PayloadLocation, + cid_in: Option<&[u8]>, +) { + let cid = match cid_in { + Some(val) => val, + None => &DUMMY_CID_BASE32[..], + }; + + let payload = match payload_location { + // Just stick Itemized & Paginated here for coverage; we don't use them for Messages + PayloadLocation::OnChain | PayloadLocation::Itemized | PayloadLocation::Paginated => + generate_payload(1, None), + PayloadLocation::IPFS => + (multibase::decode(core::str::from_utf8(cid).unwrap()).unwrap().1, IPFS_PAYLOAD_LENGTH) + .encode(), + }; + + let mut counter = 0; + for (idx, count) in message_per_block.iter().enumerate() { + for _ in 0..*count { + MessagesV3::::set( + (idx as u32, intent_id, counter), + Some(MessageV3 { + schema_id: intent_id as SchemaId, + msa_id: Some(DUMMY_MSA_ID), payload: payload.clone().try_into().unwrap(), provider_msa_id: 1, }), @@ -88,6 +134,10 @@ fn generate_payload(num_items: u8, content_len: Option) -> Vec { result_str.as_bytes().to_vec() } +fn set_v3_block(block_number: u32) { + StorageV3BlockNumber::::set(block_number); +} + #[test] fn add_message_should_store_message_in_storage() { new_test_ext().execute_with(|| { @@ -123,13 +173,14 @@ fn add_message_should_store_message_in_storage() { )); // assert messages - let msg1 = MessagesV2::::get((1, schema_id_1, 0u16)); - let msg2 = MessagesV2::::get((1, schema_id_2, 1u16)); - let msg3 = MessagesV2::::get((1, schema_id_2, 2u16)); + let msg1 = MessagesV3::::get((1, schema_id_1, 0u16)); + let msg2 = MessagesV3::::get((1, schema_id_2, 1u16)); + let msg3 = MessagesV3::::get((1, schema_id_2, 2u16)); assert_eq!( msg1, - Some(Message { + Some(MessageV3 { + schema_id: schema_id_1, msa_id: Some(get_msa_from_account(caller_1)), payload: message_payload_1.try_into().unwrap(), provider_msa_id: get_msa_from_account(caller_1) @@ -138,7 +189,8 @@ fn add_message_should_store_message_in_storage() { assert_eq!( msg2, - Some(Message { + Some(MessageV3 { + schema_id: schema_id_2, msa_id: Some(get_msa_from_account(caller_2)), payload: message_payload_2.try_into().unwrap(), provider_msa_id: get_msa_from_account(caller_2) @@ -147,7 +199,8 @@ fn add_message_should_store_message_in_storage() { assert_eq!( msg3, - Some(Message { + Some(MessageV3 { + schema_id: schema_id_2, msa_id: Some(get_msa_from_account(caller_2)), payload: message_payload_3.try_into().unwrap(), provider_msa_id: get_msa_from_account(caller_2) @@ -231,53 +284,56 @@ fn add_ipfs_message_with_invalid_msa_account_errors() { /// Assert that MessageResponse for IPFS messages returns the payload_length of the offchain message. #[test] -fn get_messages_by_schema_with_ipfs_payload_location_should_return_offchain_payload_length() { +fn get_messages_by_intent_with_ipfs_payload_location_should_return_offchain_payload_length() { new_test_ext().execute_with(|| { // Setup - let schema_id: SchemaId = IPFS_SCHEMA_ID; - let current_block = 1; + let intent_id = 1 as IntentId; + let current_block = 0; // Populate - populate_messages(schema_id, vec![1], PayloadLocation::IPFS, None); + populate_messages_v3(intent_id, vec![1], PayloadLocation::IPFS, None); // Run to the block + + set_v3_block(current_block); run_to_block(current_block + 1); let list = - MessagesPallet::get_messages_by_schema_and_block(schema_id, PayloadLocation::IPFS, 0); + MessagesPallet::get_messages_by_intent_and_block(intent_id, PayloadLocation::IPFS, 0); // IPFS messages should return the payload length that was encoded in a tuple along // with the CID: (cid, payload_length). assert_eq!(list.len(), 1); assert_eq!( list[0], - MessageResponse { + MessageResponseV2 { + schema_id: intent_id, payload: None, index: 0, provider_msa_id: 1, block_number: 0, payload_length: Some(IPFS_PAYLOAD_LENGTH), - msa_id: None, - cid: Some(DUMMY_CID_BASE32.to_vec()) + msa_id: Some(DUMMY_MSA_ID), + cid: Some(DUMMY_CID_BASE32.to_vec()), + ..Default::default() } ); }); } #[test] -fn retrieved_ipfs_message_should_always_be_in_base32() { +fn retrieved_ipfs_message_cid_should_always_be_in_base32() { new_test_ext().execute_with(|| { - let schema_id = IPFS_SCHEMA_ID; + let intent_id = 1 as IntentId; let current_block: u32 = 1; // Populate message storage using Base64-encoded CID - populate_messages(schema_id, vec![1], PayloadLocation::IPFS, Some(DUMMY_CID_BASE64)); + populate_messages_v3(intent_id, vec![1], PayloadLocation::IPFS, Some(DUMMY_CID_BASE64)); // Run to the block run_to_block(current_block + 1); let list = - MessagesPallet::get_messages_by_schema_and_block(schema_id, PayloadLocation::IPFS, 0); + MessagesPallet::get_messages_by_intent_and_block(intent_id, PayloadLocation::IPFS, 0); assert_eq!(list[0].cid.as_ref().unwrap(), &DUMMY_CID_BASE32.to_vec()); }) @@ -294,7 +350,7 @@ fn get_messages_by_schema_with_ipfs_payload_location_should_fail_bad_schema() { msa_id: Some(0), provider_msa_id: 1, }; - let mapped_response = bad_message.map_to_response(0, PayloadLocation::IPFS, 0); + let mapped_response = bad_message.map_to_response((0, PayloadLocation::IPFS, 0)).unwrap(); assert_eq!( mapped_response.cid, Some(multibase::encode(Base::Base32Lower, Vec::new()).as_bytes().to_vec()) @@ -594,9 +650,9 @@ fn validate_cid_unwrap_panics() { fn map_to_response_on_chain() { let payload_vec = b"123456789012345678901234567890".to_vec(); let payload_bounded = BoundedVec::>::try_from(payload_vec.clone()).unwrap(); - let msg = Message { payload: payload_bounded, provider_msa_id: 10u64, msa_id: None }; + let msg = Message { payload: payload_bounded, provider_msa_id: DUMMY_MSA_ID, msa_id: None }; let expected = MessageResponse { - provider_msa_id: 10u64, + provider_msa_id: DUMMY_MSA_ID, index: 1u16, block_number: 42, msa_id: None, @@ -604,7 +660,7 @@ fn map_to_response_on_chain() { cid: None, payload_length: None, }; - assert_eq!(msg.map_to_response(42, PayloadLocation::OnChain, 1), expected); + assert_eq!(msg.map_to_response((42, PayloadLocation::OnChain, 1)).unwrap(), expected); } #[test] @@ -614,7 +670,7 @@ fn map_to_response_ipfs() { let payload = BoundedVec::>::try_from(payload_tuple.encode()).unwrap(); let msg = Message { payload, provider_msa_id: 10u64, msa_id: None }; let expected = MessageResponse { - provider_msa_id: 10u64, + provider_msa_id: DUMMY_MSA_ID, index: 1u16, block_number: 42, msa_id: None, @@ -622,5 +678,5 @@ fn map_to_response_ipfs() { cid: Some(cid.as_bytes().to_vec()), payload_length: Some(10), }; - assert_eq!(msg.map_to_response(42, PayloadLocation::IPFS, 1), expected); + assert_eq!(msg.map_to_response((42, PayloadLocation::IPFS, 1)).unwrap(), expected); } diff --git a/pallets/messages/src/types.rs b/pallets/messages/src/types.rs index 2d92af899d..5f3747c26e 100644 --- a/pallets/messages/src/types.rs +++ b/pallets/messages/src/types.rs @@ -1,5 +1,8 @@ use common_primitives::{ - messages::MessageResponse, msa::MessageSourceId, node::BlockNumber, schema::PayloadLocation, + messages::MessageResponse, + msa::MessageSourceId, + node::BlockNumber, + schema::{PayloadLocation, SchemaId}, }; use core::fmt::Debug; use frame_support::{traits::Get, BoundedVec}; @@ -8,6 +11,7 @@ use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use scale_info::TypeInfo; extern crate alloc; use alloc::vec::Vec; +use common_primitives::messages::MessageResponseV2; /// Payloads stored offchain contain a tuple of (bytes(the payload reference), payload length). pub type OffchainPayloadType = (Vec, u32); @@ -33,49 +37,148 @@ where pub msa_id: Option, } -impl Message +/// A single message type definition. +#[derive(Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] +#[scale_info(skip_type_params(MaxDataSize))] +#[codec(mel_bound(MaxDataSize: MaxEncodedLen))] +pub struct MessageV3 where MaxDataSize: Get + Debug, { - /// Helper function to handle response type [`MessageResponse`] depending on the Payload Location (on chain or IPFS) - pub fn map_to_response( + /// Data structured by the associated schema's model. + pub payload: BoundedVec, + /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, + /// indicating that the original source MSA is acting as its own provider. An id differing from that + /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on + /// its behalf. + pub provider_msa_id: MessageSourceId, + /// Message source account id (the original source). + pub msa_id: Option, + /// The SchemaId of the schema that defines the format of the payload + pub schema_id: SchemaId, +} + +/// Trait for converting message storage to response type +pub trait MapToResponse { + /// Maps a stored message to an RPC response + fn map_to_response(&self, index_values: I) -> Option; +} + +impl + Debug> + MapToResponse<(BlockNumber, PayloadLocation, u16), MessageResponse> for Message +{ + fn map_to_response( &self, - block_number: BlockNumber, - payload_location: PayloadLocation, - index: u16, - ) -> MessageResponse { + index_values: (BlockNumber, PayloadLocation, u16), + ) -> Option { + let (block_number, payload_location, index) = index_values; + let base_response = MessageResponse { + provider_msa_id: self.provider_msa_id, + index, + block_number, + msa_id: self.msa_id, + ..Default::default() + }; + match payload_location { - PayloadLocation::OnChain => MessageResponse { - provider_msa_id: self.provider_msa_id, - index, - block_number, - msa_id: self.msa_id, + PayloadLocation::OnChain => Some(MessageResponse { payload: Some(self.payload.to_vec()), cid: None, payload_length: None, - }, + ..base_response + }), PayloadLocation::IPFS => { let (binary_cid, payload_length) = OffchainPayloadType::decode(&mut &self.payload[..]).unwrap_or_default(); - MessageResponse { - provider_msa_id: self.provider_msa_id, - index, - block_number, + Some(MessageResponse { cid: Some(multibase::encode(Base::Base32Lower, binary_cid).as_bytes().to_vec()), payload_length: Some(payload_length), - msa_id: None, payload: None, - } + ..base_response + }) }, // Message types of Itemized and Paginated are retrieved differently - _ => MessageResponse { - provider_msa_id: self.provider_msa_id, - index, - block_number, - msa_id: None, - payload: None, + _ => None, + } + } +} + +impl + Debug> + MapToResponse<(BlockNumber, SchemaId, PayloadLocation, u16), MessageResponseV2> + for Message +{ + /// Helper function to handle response type [`MessageResponseV2`] depending on the Payload Location (on chain or IPFS) + fn map_to_response( + &self, + index_values: (BlockNumber, SchemaId, PayloadLocation, u16), + ) -> Option { + let (block_number, schema_id, payload_location, index) = index_values; + let base_response = MessageResponseV2 { + provider_msa_id: self.provider_msa_id, + index, + block_number, + msa_id: self.msa_id, + schema_id, + ..Default::default() + }; + + match payload_location { + PayloadLocation::OnChain => Some(MessageResponseV2 { + payload: Some(self.payload.to_vec()), + cid: None, + payload_length: None, + ..base_response + }), + PayloadLocation::IPFS => { + let (binary_cid, payload_length) = + OffchainPayloadType::decode(&mut &self.payload[..]).unwrap_or_default(); + Some(MessageResponseV2 { + cid: Some(multibase::encode(Base::Base32Lower, binary_cid).as_bytes().to_vec()), + payload_length: Some(payload_length), + payload: None, + ..base_response + }) + }, // Message types of Itemized and Paginated are retrieved differently + _ => None, + } + } +} + +impl + Debug> + MapToResponse<(BlockNumber, PayloadLocation, u16), MessageResponseV2> for MessageV3 +{ + /// Helper function to handle response type [`MessageResponseV2`] depending on the Payload Location (on chain or IPFS) + fn map_to_response( + &self, + index_values: (BlockNumber, PayloadLocation, u16), + ) -> Option { + let (block_number, payload_location, index) = index_values; + let base_response = MessageResponseV2 { + provider_msa_id: self.provider_msa_id, + index, + block_number, + msa_id: self.msa_id, + schema_id: self.schema_id, + ..Default::default() + }; + + match payload_location { + PayloadLocation::OnChain => Some(MessageResponseV2 { + payload: Some(self.payload.to_vec()), cid: None, payload_length: None, - }, + ..base_response + }), + PayloadLocation::IPFS => { + let (binary_cid, payload_length) = + OffchainPayloadType::decode(&mut &self.payload[..]).unwrap_or_default(); + Some(MessageResponseV2 { + cid: Some(multibase::encode(Base::Base32Lower, binary_cid).as_bytes().to_vec()), + payload_length: Some(payload_length), + payload: None, + ..base_response + }) + }, // Message types of Itemized and Paginated are retrieved differently + _ => None, } } } From d6a6a04b31cc4961a683a9b54212ed04649765de Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Mon, 29 Sep 2025 13:06:09 -0400 Subject: [PATCH 2/7] feat: multi-block migration for messages pallet using pallet-migrations --- Cargo.lock | 2 + Cargo.toml | 1 + Makefile | 19 +- pallets/messages/Cargo.toml | 1 + pallets/messages/src/benchmarking.rs | 123 ++++++++----- pallets/messages/src/lib.rs | 81 +++------ pallets/messages/src/migration/mod.rs | 7 + pallets/messages/src/migration/v2/mod.rs | 40 +++++ pallets/messages/src/migration/v3.rs | 173 ++++++++++++++++++ pallets/messages/src/rpc/src/tests/mod.rs | 6 +- pallets/messages/src/runtime-api/src/lib.rs | 2 +- pallets/messages/src/tests/mock.rs | 11 +- pallets/messages/src/tests/other_tests.rs | 185 ++++++-------------- pallets/messages/src/types.rs | 21 +-- pallets/messages/src/weights.rs | 48 +++++ runtime/frequency/Cargo.toml | 4 + runtime/frequency/src/lib.rs | 106 ++++++++++- 17 files changed, 552 insertions(+), 278 deletions(-) create mode 100644 pallets/messages/src/migration/mod.rs create mode 100644 pallets/messages/src/migration/v2/mod.rs create mode 100644 pallets/messages/src/migration/v3.rs diff --git a/Cargo.lock b/Cargo.lock index 6163f3c145..cf8f07a1d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3941,6 +3941,7 @@ dependencies = [ "pallet-message-queue", "pallet-messages", "pallet-messages-runtime-api", + "pallet-migrations", "pallet-msa", "pallet-msa-runtime-api", "pallet-multisig", @@ -7786,6 +7787,7 @@ dependencies = [ "frame-benchmarking", "frame-support", "frame-system", + "log", "multibase", "parity-scale-codec", "pretty_assertions", diff --git a/Cargo.toml b/Cargo.toml index 210161c68b..a85854bc78 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ pallet-authorship = { git = "https://github.com/paritytech/polkadot-sdk", tag = pallet-balances = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-collective = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-democracy = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } +pallet-migrations = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-multisig = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-preimage = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } pallet-scheduler = { git = "https://github.com/paritytech/polkadot-sdk", tag = "polkadot-stable2503-7", default-features = false } diff --git a/Makefile b/Makefile index e29e8672c2..f6ae827851 100644 --- a/Makefile +++ b/Makefile @@ -419,6 +419,7 @@ LOCAL_URI=ws://localhost:9944 WASM_PATH=./target/release/wbuild/frequency-runtime/frequency_runtime.wasm # Without the state from this minimal set of pallets, try-runtime panics when trying to validate multi-block migrations MINIMAL_PALLETS=ParachainSystem ParachainInfo System Timestamp Aura Authorship +TRY_RUNTIME_BUILD_TYPE=release .PHONY: check-onfinality-api-key check-onfinality-api-key: @@ -440,25 +441,30 @@ try-runtime-%-mainnet: URI := $(MAINNET_URI) try-runtime-%-mainnet: CHAIN := mainnet try-runtime-%-local: URI := $(LOCAL_URI) try-runtime-%-local: CHAIN := local +try-runtime-%-local: WASM_PATH=./target/debug/wbuild/frequency-runtime/frequency_runtime.wasm + build-runtime-paseo-testnet: FEATURES := frequency-testnet build-runtime-bridging-testnet: FEATURES := frequency-testnet,frequency-bridging build-runtime-mainnet: FEATURES := frequency build-runtime-westend-testnet: FEATURES := frequency-westend,frequency-bridging -build-runtime-local: FEATURES := frequency-no-relay,frequency-bridging +build-runtime-local: FEATURES := frequency-no-relay +build-runtime-local: TRY_RUNTIME_BUILD_TYPE := dev .PHONY: build-runtime-paseo-testnet build-runtime-westend-testnet build-runtime-mainnet build-runtime-local +build-runtime-local \ build-runtime-paseo-testnet \ build-runtime-westend-testnet \ build-runtime-mainnet: - cargo build --package frequency-runtime --release --features $(FEATURES),try-runtime --locked + cargo build --package frequency-runtime --profile ${TRY_RUNTIME_BUILD_TYPE} --features $(FEATURES),try-runtime --locked # # The 'try-runtime' targets can optionally be constrained to fetch state for only specific pallets. This is useful to # avoid unnecessarily fetching large state trees for pallets not under test. The list of pallets is: # Msa Messages StatefulStorage Capacity FrequencyTxPayment Handles Passkey Schemas -.PHONY: try-runtime-create-snapshot-paseo-testnet try-runtime-create-snapshot-westend-testnet try-runtime-create-snapshot-mainnet +.PHONY: try-runtime-create-snapshot-paseo-testnet try-runtime-create-snapshot-westend-testnet try-runtime-create-snapshot-mainnet try-runtime-create-snapshot-local +try-runtime-create-snapshot-local \ try-runtime-create-snapshot-paseo-testnet \ try-runtime-create-snapshot-westend-testnet \ try-runtime-create-snapshot-mainnet: check-try-runtime-installed check-onfinality-api-key @@ -470,7 +476,8 @@ try-runtime-upgrade-paseo-testnet \ try-runtime-upgrade-mainnet: try-runtime-upgrade-%: check-try-runtime-installed build-runtime-% try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 live --uri $(URI) -.PHONY: try-runtime-use-snapshot-paseo-testnet try-runtime-use-snapshot-mainnet +.PHONY: try-runtime-use-snapshot-paseo-testnet try-runtime-use-snapshot-mainnet try-runtime-use-snapshot-local +try-runtime-use-snapshot-local \ try-runtime-use-snapshot-paseo-testnet \ try-runtime-use-snapshot-mainnet: try-runtime-use-snapshot-%: check-try-runtime-installed build-runtime-% try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 snap --path $(CHAIN)-$(SNAPSHOT_PALLETS).state @@ -483,11 +490,11 @@ try-runtime-check-migrations-westend-testnet: try-runtime-check-migrations-%: ch .PHONY: try-runtime-check-migrations-local try-runtime-check-migrations-local: check-try-runtime-installed build-runtime-local - try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="pre-and-post" --disable-spec-version-check --disable-mbm-checks --no-weight-warnings live --uri $(URI) $(PALLET_FLAGS) + try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="pre-and-post" --disable-spec-version-check live --uri $(URI) $(PALLET_FLAGS) .PHONY: try-runtime-check-migrations-none-local try-runtime-check-migrations-none-local: check-try-runtime-installed build-runtime-local - try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="none" --disable-spec-version-check --disable-mbm-checks --no-weight-warnings live --uri $(URI) $(PALLET_FLAGS) + try-runtime --runtime $(WASM_PATH) on-runtime-upgrade --blocktime=6000 --checks="none" --disable-spec-version-check live --uri $(URI) $(PALLET_FLAGS) # Pull the Polkadot version from the polkadot-cli package in the Cargo.lock file. # This will break if the lock file format changes diff --git a/pallets/messages/Cargo.toml b/pallets/messages/Cargo.toml index fbf0fe3aa3..54b77c0bf8 100644 --- a/pallets/messages/Cargo.toml +++ b/pallets/messages/Cargo.toml @@ -14,6 +14,7 @@ targets = ["x86_64-unknown-linux-gnu"] [dependencies] parity-scale-codec = { workspace = true, features = ["derive"] } +log = { workspace = true } scale-info = { workspace = true, features = ["derive"] } # Substrate frame-benchmarking = { workspace = true, optional = true } diff --git a/pallets/messages/src/benchmarking.rs b/pallets/messages/src/benchmarking.rs index 4b99316cd7..1ad7c9b0c9 100644 --- a/pallets/messages/src/benchmarking.rs +++ b/pallets/messages/src/benchmarking.rs @@ -1,27 +1,26 @@ #![allow(clippy::expect_used)] use super::*; -#[allow(unused)] use crate::Pallet as MessagesPallet; use common_primitives::{ msa::{DelegatorId, ProviderId}, schema::*, }; use frame_benchmarking::v2::*; -use frame_support::{assert_ok, pallet_prelude::DispatchResult}; +use frame_support::{ + assert_ok, migrations::SteppedMigration, pallet_prelude::DispatchResult, weights::WeightMeter, +}; use frame_system::{pallet_prelude::BlockNumberFor, RawOrigin}; use sp_runtime::traits::One; extern crate alloc; use alloc::vec; -const SCHEMA_SIZE: u16 = 50; const IPFS_PAYLOAD_LENGTH: u32 = 10; const MAX_MESSAGES_IN_BLOCK: u32 = 500; -const ON_CHAIN_SCHEMA_ID: u16 = 16001; -// this value should be the same as the one used in mocks tests +// Any IPFS schema ID from the [pallet genesis file](../../../resources/genesis-schemas.json) will work const IPFS_SCHEMA_ID: u16 = 20; -fn onchain_message(schema_id: SchemaId) -> DispatchResult { +fn onchain_message(intent_id: IntentId, schema_id: SchemaId) -> DispatchResult { let message_source_id = DelegatorId(1); let provider_id = ProviderId(1); let payload = Vec::from( @@ -33,7 +32,7 @@ fn onchain_message(schema_id: SchemaId) -> DispatchResult { provider_id.into(), Some(message_source_id.into()), bounded_payload, - schema_id, // intent_id + intent_id, schema_id, BlockNumberFor::::one(), )?; @@ -41,7 +40,7 @@ fn onchain_message(schema_id: SchemaId) -> DispatchResult { } /// Helper function to call MessagesPallet::::add_ipfs_message -fn ipfs_message(schema_id: SchemaId) -> DispatchResult { +fn ipfs_message(intent_id: IntentId, schema_id: SchemaId) -> DispatchResult { let payload = Vec::from("bafkreidgvpkjawlxz6sffxzwgooowe5yt7i6wsyg236mfoks77nywkptdq".as_bytes()); let provider_id = ProviderId(1); @@ -52,7 +51,7 @@ fn ipfs_message(schema_id: SchemaId) -> DispatchResult { provider_id.into(), None, bounded_payload, - schema_id, // intent_id + intent_id, schema_id, BlockNumberFor::::one(), )?; @@ -60,28 +59,10 @@ fn ipfs_message(schema_id: SchemaId) -> DispatchResult { Ok(()) } -fn create_intent_and_schema(location: PayloadLocation) -> DispatchResult { - let intent_id = T::SchemaBenchmarkHelper::create_intent( - b"benchmark.intent".to_vec(), - location, - Vec::default(), - )?; - let _ = T::SchemaBenchmarkHelper::create_schema( - intent_id, - Vec::from(r#"{"Name": "Bond", "Code": "007"}"#.as_bytes()), - ModelType::AvroBinary, - location, - )?; - Ok(()) -} - -fn set_storage_v3_block(block_number: BlockNumberFor) { - StorageV3BlockNumber::::put(block_number); -} - #[benchmarks] mod benchmarks { use super::*; + use frame_support::{pallet_prelude::StorageVersion, traits::GetStorageVersion}; #[benchmark] fn add_onchain_message( @@ -89,15 +70,20 @@ mod benchmarks { ) -> Result<(), BenchmarkError> { let message_source_id = DelegatorId(2); let caller: T::AccountId = whitelisted_caller(); - let schema_id = ON_CHAIN_SCHEMA_ID; - let intent_id = ON_CHAIN_SCHEMA_ID as IntentId; - - set_storage_v3_block::(BlockNumberFor::::one()); - - // schema ids start from 1, and we need to add that many to make sure our desired id exists - for _ in 0..=SCHEMA_SIZE { - assert_ok!(create_intent_and_schema::(PayloadLocation::OnChain)); - } + + // The schemas pallet genesis does not contain an OnChain intent/schema, so we create one here + let intent_id = T::SchemaBenchmarkHelper::create_intent( + b"benchmark.onchain-intent".to_vec(), + PayloadLocation::OnChain, + Vec::default(), + )?; + + let schema_id = T::SchemaBenchmarkHelper::create_schema( + intent_id, + Vec::from(r#"{"Name": "Bond", "Code": "007"}"#.as_bytes()), + ModelType::AvroBinary, + PayloadLocation::OnChain, + )?; assert_ok!(T::MsaBenchmarkHelper::add_key(ProviderId(1).into(), caller.clone())); assert_ok!(T::MsaBenchmarkHelper::set_delegation_relationship( @@ -108,7 +94,7 @@ mod benchmarks { let payload = vec![1; n as usize]; for _ in 1..MAX_MESSAGES_IN_BLOCK { - assert_ok!(onchain_message::(schema_id)); + assert_ok!(onchain_message::(intent_id, schema_id)); } #[extrinsic_call] @@ -136,15 +122,9 @@ mod benchmarks { let schema_id = IPFS_SCHEMA_ID; let intent_id = IPFS_SCHEMA_ID as IntentId; - set_storage_v3_block::(BlockNumberFor::::one()); - - // schema ids start from 1, and we need to add that many to make sure our desired id exists - for _ in 0..=SCHEMA_SIZE { - assert_ok!(create_intent_and_schema::(PayloadLocation::IPFS)); - } assert_ok!(T::MsaBenchmarkHelper::add_key(ProviderId(1).into(), caller.clone())); for _ in 1..MAX_MESSAGES_IN_BLOCK { - assert_ok!(ipfs_message::(schema_id)); + assert_ok!(ipfs_message::(intent_id, schema_id)); } #[extrinsic_call] @@ -162,6 +142,59 @@ mod benchmarks { Ok(()) } + /// Benchmark a single step of the `v3::MigrateV2ToV3` migration. Here we benchmark the cost + /// to migrate a _single record_. This weight is then used in the migration itself for self-metering. + #[benchmark] + fn v2_to_v3_step() { + let payload: BoundedVec = + vec![1; 3072].try_into().expect("Unable to create BoundedVec payload"); + let old_message = migration::v2::Message { + payload: payload.clone(), + provider_msa_id: 1u64, + msa_id: Some(1u64), + }; + migration::v2::MessagesV2::::insert( + (BlockNumberFor::::default(), 0u16, 0u16), + old_message, + ); + let mut meter = WeightMeter::new(); + + #[block] + { + migration::v3::MigrateV2ToV3::>::step(None, &mut meter) + .expect("migration call failed"); + } + + // Check that the new storage is decodable: + let new_message = + crate::MessagesV3::::try_get((BlockNumberFor::::default(), 0u16, 0u16)); + assert!(new_message.is_ok()); + let new_message = new_message.expect("Unable to fetch migrated message"); + assert_eq!(Some(1u64), new_message.msa_id); + assert_eq!(1u64, new_message.provider_msa_id); + assert_eq!(payload, new_message.payload); + assert_eq!(new_message.schema_id, 0u16); + } + + /// Benchmark a single step of the `v3::FinalizeV3Migration` migration. Here we benchmark the cost + /// to migrate a _single record_. This weight is then used in the migration itself for self-metering. + #[benchmark] + fn v2_to_v3_final_step() { + let mut meter = WeightMeter::new(); + + #[block] + { + migration::v3::FinalizeV3Migration::>::step( + None, &mut meter, + ) + .expect("final storage version migration failed"); + } + + // Check that the storage version was correctly set + let new_version = Pallet::::on_chain_storage_version(); + assert_eq!(new_version, StorageVersion::new(3)); + } + impl_benchmark_test_suite!( MessagesPallet, crate::tests::mock::new_test_ext(), diff --git a/pallets/messages/src/lib.rs b/pallets/messages/src/lib.rs index 02f71e93cc..0eac173e4a 100644 --- a/pallets/messages/src/lib.rs +++ b/pallets/messages/src/lib.rs @@ -29,11 +29,16 @@ pub mod weights; mod types; +/// Storage migrations +pub mod migration; + use core::{convert::TryInto, fmt::Debug}; use frame_support::{ensure, pallet_prelude::Weight, traits::Get, BoundedVec}; use sp_runtime::DispatchError; extern crate alloc; +extern crate core; + use alloc::vec::Vec; use common_primitives::{ messages::*, @@ -104,20 +109,7 @@ pub mod pallet { #[pallet::whitelist_storage] pub(super) type BlockMessageIndex = StorageValue<_, MessageIndex, ValueQuery>; - /// Storage for messages in STORAGE_VERSION(2) and lower - #[pallet::storage] - pub(super) type MessagesV2 = StorageNMap< - _, - ( - storage::Key>, - storage::Key, - storage::Key, - ), - Message, - OptionQuery, - >; - - /// Storage for messages in STORAGE_VERSION(3) and higher + /// Storage for messages #[pallet::storage] pub(super) type MessagesV3 = StorageNMap< _, @@ -126,15 +118,10 @@ pub mod pallet { storage::Key, storage::Key, ), - MessageV3, + Message, OptionQuery, >; - /// The block number at which the STORAGE_VERSION(3) migration was completed - #[pallet::storage] - pub(super) type StorageV3BlockNumber = - StorageValue<_, BlockNumberFor, ValueQuery>; - #[pallet::error] pub enum Error { /// Deprecated: Too many messages are added to existing block @@ -193,7 +180,7 @@ pub mod pallet { #[pallet::call] impl Pallet { /// Adds a message for a resource hosted on IPFS. The input consists of - /// both a Base32-encoded [CID](https://docs.ipfs.tech/concepts/content-addressing/#version-1-v1) + /// a Base32-encoded [CID](https://docs.ipfs.tech/concepts/content-addressing/#version-1-v1) /// as well as a 32-bit content length. The stored payload will contain the /// CID encoded as binary, as well as the 32-bit message content length. /// The actual message content will be on IPFS. @@ -202,13 +189,13 @@ pub mod pallet { /// * [`Event::MessagesInBlock`] - Messages Stored in the block /// /// # Errors - /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large - /// * [`Error::InvalidSchemaId`] - Schema not found - /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location - /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA - /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full - /// * [`Error::UnsupportedCidVersion`] - CID version is not supported (V0) - /// * [`Error::InvalidCid`] - Unable to parse provided CID + /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large. + /// * [`Error::InvalidSchemaId`] - Schema not found. + /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location. + /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA. + /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full. + /// * [`Error::UnsupportedCidVersion`] - CID version is not supported (V0). + /// * [`Error::InvalidCid`] - Unable to parse provided CID. /// #[pallet::call_index(0)] #[pallet::weight(T::WeightInfo::add_ipfs_message())] @@ -256,12 +243,12 @@ pub mod pallet { /// * [`Event::MessagesInBlock`] - In the next block /// /// # Errors - /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large - /// * [`Error::InvalidSchemaId`] - Schema not found - /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location - /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA - /// * [`Error::UnAuthorizedDelegate`] - Trying to add a message without a proper delegation between the origin and the on_behalf_of MSA - /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full + /// * [`Error::ExceedsMaxMessagePayloadSizeBytes`] - Payload is too large. + /// * [`Error::InvalidSchemaId`] - Schema not found. + /// * [`Error::InvalidPayloadLocation`] - The schema is not an IPFS payload location. + /// * [`Error::InvalidMessageSourceAccount`] - Origin must be from an MSA. + /// * [`Error::UnAuthorizedDelegate`] - Trying to add a message without a proper delegation between the origin and the on_behalf_of MSA. + /// * [`Error::TypeConversionOverflow`] - Failed to add the message to storage as it is very full. /// #[pallet::call_index(1)] #[pallet::weight(T::WeightInfo::add_onchain_message(payload.len() as u32))] @@ -337,7 +324,7 @@ impl Pallet { ) -> Result { let index = BlockMessageIndex::::get(); let first = index == 0; - let msg = MessageV3 { + let msg = Message { schema_id, payload, // size is checked on top of extrinsic provider_msa_id, @@ -377,25 +364,11 @@ impl Pallet { PayloadLocation::Itemized | PayloadLocation::Paginated => Vec::new(), _ => { let mut messages: Vec = - if StorageV3BlockNumber::::get().gt(&block_number) { - // Get message from pre-V3 storage. Pre-V3 intent ids equal schema ids. - MessagesV2::::iter_prefix((block_number, intent_id as SchemaId)) - .filter_map(|(index, msg)| { - msg.map_to_response(( - block_number_value, - intent_id as SchemaId, - payload_location, - index, - )) - }) - .collect() - } else { - MessagesV3::::iter_prefix((block_number, intent_id)) - .filter_map(|(index, msg)| { - msg.map_to_response((block_number_value, payload_location, index)) - }) - .collect() - }; + MessagesV3::::iter_prefix((block_number, intent_id)) + .filter_map(|(index, msg)| { + msg.map_to_response((block_number_value, payload_location, index)) + }) + .collect(); messages.sort_by(|a, b| a.index.cmp(&b.index)); messages }, diff --git a/pallets/messages/src/migration/mod.rs b/pallets/messages/src/migration/mod.rs new file mode 100644 index 0000000000..77b7d1f2e3 --- /dev/null +++ b/pallets/messages/src/migration/mod.rs @@ -0,0 +1,7 @@ +/// Migration module for migrating from V2 to V3 +pub mod v2; + +/// Migration module for migrating from V2 to V3 +pub mod v3; + +pub use v3::{FinalizeV3Migration, MigrateV2ToV3}; diff --git a/pallets/messages/src/migration/v2/mod.rs b/pallets/messages/src/migration/v2/mod.rs new file mode 100644 index 0000000000..4726ecf676 --- /dev/null +++ b/pallets/messages/src/migration/v2/mod.rs @@ -0,0 +1,40 @@ +use crate::{Config, MessageIndex, Pallet}; +use common_primitives::{msa::MessageSourceId, schema::SchemaId}; +use core::fmt::Debug; +use frame_support::{pallet_prelude::*, storage_alias}; +use frame_system::pallet_prelude::BlockNumberFor; +use sp_runtime::codec::{Decode, Encode}; + +/// Storage for messages in v2 and lower +/// - Key: (block_number, schema_id, message_index) +/// - Value: Message +#[storage_alias] +pub(crate) type MessagesV2 = StorageNMap< + Pallet, + ( + storage::Key>, + storage::Key, + storage::Key, + ), + Message<::MessagesMaxPayloadSizeBytes>, + OptionQuery, +>; + +/// A single message type definition for V2 storage +#[derive(Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] +#[scale_info(skip_type_params(MaxDataSize))] +#[codec(mel_bound(MaxDataSize: MaxEncodedLen))] +pub struct Message +where + MaxDataSize: Get + Debug, +{ + /// Data structured by the associated schema's model. + pub payload: BoundedVec, + /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, + /// indicating that the original source MSA is acting as its own provider. An id differing from that + /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on + /// its behalf. + pub provider_msa_id: MessageSourceId, + /// Message source account id (the original source). + pub msa_id: Option, +} diff --git a/pallets/messages/src/migration/v3.rs b/pallets/messages/src/migration/v3.rs new file mode 100644 index 0000000000..0432eb085b --- /dev/null +++ b/pallets/messages/src/migration/v3.rs @@ -0,0 +1,173 @@ +use crate::{migration::v2, pallet::MessagesV3, weights, Config, Message, MessageIndex, Pallet}; +#[cfg(feature = "try-runtime")] +use alloc::vec::Vec; +use common_primitives::schema::SchemaId; +use core::marker::PhantomData; +use frame_support::{ + migrations::{MigrationId, SteppedMigration, SteppedMigrationError}, + pallet_prelude::StorageVersion, + weights::WeightMeter, +}; +use frame_system::pallet_prelude::BlockNumberFor; +#[cfg(feature = "try-runtime")] +use parity_scale_codec::{Decode, Encode}; + +const LOG_TARGET: &str = "pallet::messages::migration::v3"; + +fn convert_from_old( + id: SchemaId, + old_info: v2::Message, +) -> Message { + Message { + schema_id: id, + payload: old_info.payload, + provider_msa_id: old_info.provider_msa_id, + msa_id: old_info.msa_id, + } +} + +/// Migrates the items of the [`v2::MessagesV2`] map to [`crate::MessagesV3`] +/// +/// The `step` function will be called once per block. It is very important that this function +/// *never* panics and never uses more weight than it got in its meter. The migrations should also +/// try to make maximal progress per step, so that the total time it takes to migrate stays low. +pub struct MigrateV2ToV3(PhantomData<(T, W)>); +impl SteppedMigration for MigrateV2ToV3 { + type Cursor = (BlockNumberFor, SchemaId, MessageIndex); + // Without the explicit length here the construction of the ID would not be infallible. + type Identifier = MigrationId<31>; + + /// The identifier of this migration. Which should be globally unique. + fn id() -> Self::Identifier { + MigrationId { + pallet_id: *b"pallet::messages::migration::v3", + version_from: 2, + version_to: 3, + } + } + + /// The actual logic of the migration. + /// + /// This function is called repeatedly until it returns `Ok(None)`, indicating that the + /// migration is complete. Ideally, the migration should be designed in such a way that each + /// step consumes as much weight as possible. + fn step( + cursor: Option, + meter: &mut WeightMeter, + ) -> Result, SteppedMigrationError> { + let required = W::v2_to_v3_step(); + // If there is not enough weight for a single step, return an error. This case can be + // problematic if it is the first migration that ran in this block. But there is nothing + // that we can do about it here. + if meter.remaining().any_lt(required) { + return Err(SteppedMigrationError::InsufficientWeight { required }); + } + + let mut count = 0u32; + + let mut iter = v2::MessagesV2::::drain(); + let mut last_cursor = cursor; + + // We loop here to do as much progress as possible per step. + loop { + if meter.try_consume(required).is_err() { + break; + } + + // If there's a next item in the iterator, perform the migration. + if let Some(((block_number, schema_id, index), value)) = iter.next() { + count += 1; + // Migrate the inner value to the new structure + let value = convert_from_old::(schema_id, value); + // We can just insert here since the old and the new map share the same key-space. + MessagesV3::::insert((block_number, schema_id, index), value); + last_cursor = Some((block_number, schema_id, index)); + } else { + log::info!(target: LOG_TARGET, "Migrated final {} messages", count); + return Ok(None); + } + } + + log::info!(target: LOG_TARGET, "Migrated {} messages", count); + Ok(last_cursor) + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result, frame_support::sp_runtime::TryRuntimeError> { + // Return the state of the storage before the migration. + Ok((v2::MessagesV2::::iter().count() as u32).encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(prev: Vec) -> Result<(), frame_support::sp_runtime::TryRuntimeError> { + // Check the state of the storage after the migration. + let prev_messages = + ::decode(&mut &prev[..]).expect("Failed to decode the previous storage state"); + + // Check the len of prev and post are the same. + assert_eq!( + MessagesV3::::iter().count() as u32, + prev_messages, + "Migration failed: the number of items in the storage after the migration is not the same as before" + ); + + Ok(()) + } +} + +/// Finalize the migration of [`v2::MessagesV2`] map to [`crate::MessagesV3`] +/// by updating the pallet storage version. +pub struct FinalizeV3Migration(PhantomData<(T, W)>); +impl SteppedMigration for FinalizeV3Migration { + type Cursor = (BlockNumberFor, SchemaId, MessageIndex); + // Without the explicit length here the construction of the ID would not be infallible. + type Identifier = MigrationId<40>; + + /// The identifier of this migration. Which should be globally unique. + fn id() -> Self::Identifier { + MigrationId { + pallet_id: *b"pallet::messages::migration::v3-finalize", + version_from: 2, + version_to: 3, + } + } + + /// Final migration step + fn step( + _cursor: Option, + meter: &mut WeightMeter, + ) -> Result, SteppedMigrationError> { + let required = W::v2_to_v3_final_step(); + // If there is not enough weight for a single step, return an error. This case can be + // problematic if it is the first migration that ran in this block. But there is nothing + // that we can do about it here. + if meter.try_consume(required).is_err() { + return Err(SteppedMigrationError::InsufficientWeight { required }); + } + StorageVersion::new(3).put::>(); + + log::info!(target: LOG_TARGET, "Finalized messages pallet migration: storage version set to 3"); + Ok(None) + } + + #[cfg(feature = "try-runtime")] + fn pre_upgrade() -> Result, frame_support::sp_runtime::TryRuntimeError> { + // Return the storage version before the migration + Ok(StorageVersion::get::>().encode()) + } + + #[cfg(feature = "try-runtime")] + fn post_upgrade(prev: Vec) -> Result<(), frame_support::sp_runtime::TryRuntimeError> { + // Check the state of the storage after the migration. + let prev_version = ::decode(&mut &prev[..]) + .expect("Failed to decode the previous storage state"); + + // Check the len of prev and post are the same. + assert!( + StorageVersion::get::>() > prev_version, + "Migration failed: current storage version is not greater than the previous storage version" + ); + + Ok(()) + } +} diff --git a/pallets/messages/src/rpc/src/tests/mod.rs b/pallets/messages/src/rpc/src/tests/mod.rs index 9280c8f6be..805c2349de 100644 --- a/pallets/messages/src/rpc/src/tests/mod.rs +++ b/pallets/messages/src/rpc/src/tests/mod.rs @@ -36,16 +36,14 @@ fn test_messages() -> Vec { sp_api::mock_impl_runtime_apis! { impl MessagesRuntimeApi for TestRuntimeApi { - fn get_schema_by_id(schema_id: SchemaId) -> Option { + fn get_schema_by_id(schema_id: SchemaId) -> Option { match schema_id { - SCHEMA_ID_EMPTY | SCHEMA_ID_HAS_MESSAGES => Some(SchemaResponseV2 { + SCHEMA_ID_EMPTY | SCHEMA_ID_HAS_MESSAGES => Some(SchemaResponse { schema_id, - intent_id: schema_id, model: b"schema".to_vec(), model_type: ModelType::AvroBinary, payload_location: PayloadLocation::OnChain, settings: Vec::new(), - status: SchemaStatus::Active, }), _ => None, } diff --git a/pallets/messages/src/runtime-api/src/lib.rs b/pallets/messages/src/runtime-api/src/lib.rs index f3a43cabee..102f4a733e 100644 --- a/pallets/messages/src/runtime-api/src/lib.rs +++ b/pallets/messages/src/runtime-api/src/lib.rs @@ -41,7 +41,7 @@ sp_api::decl_runtime_apis! { /// Retrieve the messages for a particular intent and block range (paginated) #[api_version(2)] - fn get_messages_by_intent_id(intent_id: IntentId, pagination: BlockPaginationRequest) -> Result, String>; + fn get_messages_by_intent_id(intent_id: IntentId, pagination: BlockPaginationRequest) -> BlockPaginationResponse; /// Retrieve a schema by id // TODO: Remove once all RPC nodes have been updated to call the schemas pallet runtime for this diff --git a/pallets/messages/src/tests/mock.rs b/pallets/messages/src/tests/mock.rs index 6bb42cbcda..6e6315a2f5 100644 --- a/pallets/messages/src/tests/mock.rs +++ b/pallets/messages/src/tests/mock.rs @@ -27,6 +27,8 @@ pub const INVALID_SCHEMA_ID: SchemaId = 65534; // this value should be the same as the one used in benchmarking pub const IPFS_SCHEMA_ID: SchemaId = 20; +pub const ON_CHAIN_SCHEMA_ID: SchemaId = 16001; + pub const IPFS_PAYLOAD_LENGTH: u32 = 1200; pub const DUMMY_CID_BASE32: &[u8; 59] = @@ -81,14 +83,7 @@ pub type MaxSchemaGrantsPerDelegation = ConstU32<30>; // Needs parameter_types! for the impls below parameter_types! { - // Max payload size was picked specifically to be large enough to accommodate - // a CIDv1 using SHA2-256, but too small to accommodate CIDv1 w/SHA2-512. - // This is purely so that we can test the error condition. Real world configuration - // should have this set large enough to accommodate the largest possible CID. - // Take care when adding new tests for on-chain (not IPFS) messages that the payload - // is not too big. - pub const MessagesMaxPayloadSizeBytes: u32 = 73; - + pub const MessagesMaxPayloadSizeBytes: u32 = 1024 * 3; } impl std::fmt::Debug for MessagesMaxPayloadSizeBytes { diff --git a/pallets/messages/src/tests/other_tests.rs b/pallets/messages/src/tests/other_tests.rs index 8ca35572cc..531d8f9bac 100644 --- a/pallets/messages/src/tests/other_tests.rs +++ b/pallets/messages/src/tests/other_tests.rs @@ -1,72 +1,21 @@ -use crate::{tests::mock::*, BlockMessageIndex, Error, Event as MessageEvent, MapToResponse, Message, MessageV3, MessagesV2}; -use common_primitives::{messages::MessageResponse, schema::*}; +extern crate alloc; +use crate::{ + pallet::{Config, MessagesV3}, + tests::mock::*, + BlockMessageIndex, Error, Event as MessageEvent, MapToResponse, Message, +}; +use alloc::vec::Vec; +use common_primitives::{messages::MessageResponseV2, schema::*}; use frame_support::{assert_err, assert_noop, assert_ok, traits::OnInitialize, BoundedVec}; use frame_system::{EventRecord, Phase}; use multibase::Base; use parity_scale_codec::Encode; #[allow(unused_imports)] use pretty_assertions::{assert_eq, assert_ne, assert_str_eq}; -use rand::Rng; -use serde::Serialize; use sp_core::ConstU32; -extern crate alloc; -use alloc::vec::Vec; -use common_primitives::messages::MessageResponseV2; -use crate::pallet::{MessagesV3, StorageV3BlockNumber}; - -#[derive(Serialize)] -#[allow(non_snake_case)] -struct Payload { - // Normally would be u64, but we keep it to u8 in order to keep payload size down in tests. - fromId: u8, - content: String, -} pub const DUMMY_CID_SHA512: &str = "bafkrgqb76pscorjihsk77zpyst3p364zlti6aojlu4nga34vhp7t5orzwbwwytvp7ej44r5yhjzneanqwb5arcnvuvfwo2d4qgzyx5hymvto4"; -/// Populate mocked Messages storage with message data. -/// -/// # Arguments -/// * `schema_id` - Registered schema id to which stored messages should adhere -/// * `message_per_block` - A signed transaction origin from the provider -/// * `payload_location` - Determines how a message payload is encoded. PayloadLocation::IPFS -/// will encode (mock CID, IPFS_PAYLOAD_LENGTH) on the message payload. -fn populate_messages_v2( - schema_id: SchemaId, - message_per_block: Vec, - payload_location: PayloadLocation, - cid_in: Option<&[u8]>, -) { - let cid = match cid_in { - Some(val) => val, - None => &DUMMY_CID_BASE32[..], - }; - - let payload = match payload_location { - // Just stick Itemized & Paginated here for coverage; we don't use them for Messages - PayloadLocation::OnChain | PayloadLocation::Itemized | PayloadLocation::Paginated => - generate_payload(1, None), - PayloadLocation::IPFS => - (multibase::decode(core::str::from_utf8(cid).unwrap()).unwrap().1, IPFS_PAYLOAD_LENGTH) - .encode(), - }; - - let mut counter = 0; - for (idx, count) in message_per_block.iter().enumerate() { - for _ in 0..*count { - MessagesV2::::set( - (idx as u32, schema_id, counter), - Some(Message { - msa_id: Some(DUMMY_MSA_ID), - payload: payload.clone().try_into().unwrap(), - provider_msa_id: 1, - }), - ); - counter += 1; - } - } -} - /// Populate mocked Messages storage with message data. /// /// # Arguments @@ -80,15 +29,12 @@ fn populate_messages_v3( payload_location: PayloadLocation, cid_in: Option<&[u8]>, ) { - let cid = match cid_in { - Some(val) => val, - None => &DUMMY_CID_BASE32[..], - }; + let cid = cid_in.unwrap_or_else(|| &DUMMY_CID_BASE32[..]); let payload = match payload_location { // Just stick Itemized & Paginated here for coverage; we don't use them for Messages PayloadLocation::OnChain | PayloadLocation::Itemized | PayloadLocation::Paginated => - generate_payload(1, None), + generate_payload(1), PayloadLocation::IPFS => (multibase::decode(core::str::from_utf8(cid).unwrap()).unwrap().1, IPFS_PAYLOAD_LENGTH) .encode(), @@ -99,7 +45,7 @@ fn populate_messages_v3( for _ in 0..*count { MessagesV3::::set( (idx as u32, intent_id, counter), - Some(MessageV3 { + Some(Message { schema_id: intent_id as SchemaId, msa_id: Some(DUMMY_MSA_ID), payload: payload.clone().try_into().unwrap(), @@ -114,28 +60,9 @@ fn populate_messages_v3( /// Helper function to generate message payloads /// /// # Arguments -/// * `num_items` - Number of message to include in payload -/// * `content_len` - Length of content string to generate -fn generate_payload(num_items: u8, content_len: Option) -> Vec { - let mut result_str = String::new(); - let size = content_len.unwrap_or(3); - let mut rng = rand::rng(); - - for _ in 0..num_items { - let payload = serde_json::to_string(&Payload { - fromId: rng.random(), - content: (0..size).map(|_| "X").collect::(), - }) - .unwrap(); - - result_str.push_str(payload.as_str()); - } - - result_str.as_bytes().to_vec() -} - -fn set_v3_block(block_number: u32) { - StorageV3BlockNumber::::set(block_number); +/// * `content_len` - Length of payload +fn generate_payload(content_len: u32) -> Vec { + vec![1u8; content_len as usize].to_vec() } #[test] @@ -146,9 +73,9 @@ fn add_message_should_store_message_in_storage() { let caller_2 = 2; let schema_id_1: SchemaId = 1; let schema_id_2: SchemaId = 2; - let message_payload_1 = generate_payload(1, None); - let message_payload_2 = generate_payload(1, None); - let message_payload_3 = generate_payload(1, None); + let message_payload_1 = generate_payload(1); + let message_payload_2 = generate_payload(1); + let message_payload_3 = generate_payload(1); // act assert_ok!(MessagesPallet::add_onchain_message( @@ -179,7 +106,7 @@ fn add_message_should_store_message_in_storage() { assert_eq!( msg1, - Some(MessageV3 { + Some(Message { schema_id: schema_id_1, msa_id: Some(get_msa_from_account(caller_1)), payload: message_payload_1.try_into().unwrap(), @@ -189,7 +116,7 @@ fn add_message_should_store_message_in_storage() { assert_eq!( msg2, - Some(MessageV3 { + Some(Message { schema_id: schema_id_2, msa_id: Some(get_msa_from_account(caller_2)), payload: message_payload_2.try_into().unwrap(), @@ -199,7 +126,7 @@ fn add_message_should_store_message_in_storage() { assert_eq!( msg3, - Some(MessageV3 { + Some(Message { schema_id: schema_id_2, msa_id: Some(get_msa_from_account(caller_2)), payload: message_payload_3.try_into().unwrap(), @@ -225,15 +152,15 @@ fn add_message_with_too_large_message_should_panic() { new_test_ext().execute_with(|| { // arrange let caller_1 = 5; - let schema_id_1: SchemaId = 1; - let message_payload_1 = generate_payload(3, None); + let message_payload_1 = + generate_payload(::MessagesMaxPayloadSizeBytes::get() + 1); // act assert_noop!( MessagesPallet::add_onchain_message( RuntimeOrigin::signed(caller_1), None, - schema_id_1, + ON_CHAIN_SCHEMA_ID, message_payload_1 ), Error::::ExceedsMaxMessagePayloadSizeBytes @@ -247,7 +174,7 @@ fn add_message_with_invalid_msa_account_errors() { // arrange let caller_1 = 1000; let schema_id_1: SchemaId = 1; - let message_payload_1 = generate_payload(2, None); + let message_payload_1 = generate_payload(2); // act assert_noop!( @@ -294,7 +221,6 @@ fn get_messages_by_intent_with_ipfs_payload_location_should_return_offchain_payl populate_messages_v3(intent_id, vec![1], PayloadLocation::IPFS, None); // Run to the block + - set_v3_block(current_block); run_to_block(current_block + 1); let list = @@ -340,7 +266,7 @@ fn retrieved_ipfs_message_cid_should_always_be_in_base32() { } #[test] -fn get_messages_by_schema_with_ipfs_payload_location_should_fail_bad_schema() { +fn decode_message_with_invalid_payload_should_yield_empty_cid() { new_test_ext().execute_with(|| { let bad_message: Message = Message { payload: BoundedVec::try_from( @@ -349,8 +275,10 @@ fn get_messages_by_schema_with_ipfs_payload_location_should_fail_bad_schema() { .unwrap(), msa_id: Some(0), provider_msa_id: 1, + schema_id: 1, }; - let mapped_response = bad_message.map_to_response((0, PayloadLocation::IPFS, 0)).unwrap(); + let mapped_response: MessageResponseV2 = + bad_message.map_to_response((0, PayloadLocation::IPFS, 0)).unwrap(); assert_eq!( mapped_response.cid, Some(multibase::encode(Base::Base32Lower, Vec::new()).as_bytes().to_vec()) @@ -365,7 +293,7 @@ fn add_message_via_non_delegate_should_fail() { let message_producer = 1; let message_provider = 2000; let schema_id_1: SchemaId = 1; - let message_payload_1 = generate_payload(1, None); + let message_payload_1 = generate_payload(1); // act assert_err!( MessagesPallet::add_onchain_message( @@ -378,18 +306,18 @@ fn add_message_via_non_delegate_should_fail() { ); // assert - let msg = MessagesV2::::get((1, schema_id_1, 0)); + let msg = MessagesV3::::get((1, schema_id_1, 0)); assert_eq!(msg, None); }); } #[test] -fn add_message_with_invalid_schema_id_should_error() { +fn add_onchain_message_with_invalid_schema_id_should_error() { new_test_ext().execute_with(|| { // arrange let caller_1 = 5; let schema_id_1: SchemaId = INVALID_SCHEMA_ID; - let message_payload_1 = generate_payload(2, None); + let message_payload_1 = generate_payload(2); // act assert_err!( @@ -439,7 +367,7 @@ fn valid_payload_location_ipfs() { } #[test] -fn invalid_payload_location_ipfs() { +fn add_ipfs_message_with_invalid_payload_location_should_fail() { new_test_ext().execute_with(|| { let caller_1 = 5; let schema_id_1: SchemaId = 1; @@ -457,10 +385,10 @@ fn invalid_payload_location_ipfs() { } #[test] -fn invalid_payload_location_onchain() { +fn add_onchain_message_with_invalid_payload_location_should_fail() { new_test_ext().execute_with(|| { let caller_1 = 5; - let payload = generate_payload(1, None); + let payload = generate_payload(1); assert_noop!( MessagesPallet::add_onchain_message( @@ -474,24 +402,6 @@ fn invalid_payload_location_onchain() { }); } -#[test] -fn add_ipfs_message_with_large_payload_errors() { - new_test_ext().execute_with(|| { - let caller_1 = 5u64; - - assert_noop!( - MessagesPallet::add_ipfs_message( - RuntimeOrigin::signed(caller_1), - IPFS_SCHEMA_ID, - // We've deliberately mocked MaxMessagePayloadSizeBytes to be too small to contain a CIDv1 with a SHA2-512 hash. - DUMMY_CID_SHA512.as_bytes().to_vec(), - 15 - ), - Error::::ExceedsMaxMessagePayloadSizeBytes - ); - }) -} - #[test] fn add_ipfs_message_cid_v0_errors() { new_test_ext().execute_with(|| { @@ -535,8 +445,8 @@ fn on_initialize_should_clean_up_temporary_storage() { let caller_2 = 2; let schema_id_1: SchemaId = 1; let schema_id_2: SchemaId = 2; - let message_payload_1 = generate_payload(1, None); - let message_payload_2 = generate_payload(1, None); + let message_payload_1 = generate_payload(1); + let message_payload_2 = generate_payload(1); assert_ok!(MessagesPallet::add_onchain_message( RuntimeOrigin::signed(caller_1), None, @@ -637,7 +547,7 @@ fn validate_cid_not_correct_format_errors() { } #[test] -fn validate_cid_unwrap_panics() { +fn validate_cid_unwrap_errors() { new_test_ext().execute_with(|| { // This should not panic, but should return an error. let bad_cid = vec![102, 70, 70, 70, 70, 70, 70, 70, 70, 48, 48, 48, 54, 53, 53, 48, 48]; @@ -650,8 +560,13 @@ fn validate_cid_unwrap_panics() { fn map_to_response_on_chain() { let payload_vec = b"123456789012345678901234567890".to_vec(); let payload_bounded = BoundedVec::>::try_from(payload_vec.clone()).unwrap(); - let msg = Message { payload: payload_bounded, provider_msa_id: DUMMY_MSA_ID, msa_id: None }; - let expected = MessageResponse { + let msg = Message { + payload: payload_bounded, + provider_msa_id: DUMMY_MSA_ID, + msa_id: None, + schema_id: 1, + }; + let expected = MessageResponseV2 { provider_msa_id: DUMMY_MSA_ID, index: 1u16, block_number: 42, @@ -659,8 +574,9 @@ fn map_to_response_on_chain() { payload: Some(payload_vec), cid: None, payload_length: None, + schema_id: 1, }; - assert_eq!(msg.map_to_response((42, PayloadLocation::OnChain, 1)).unwrap(), expected); + assert_eq!(msg.map_to_response((42, 1, PayloadLocation::OnChain, 1)).unwrap(), expected); } #[test] @@ -668,8 +584,8 @@ fn map_to_response_ipfs() { let cid = DUMMY_CID_SHA512; let payload_tuple: crate::OffchainPayloadType = (multibase::decode(cid).unwrap().1, 10); let payload = BoundedVec::>::try_from(payload_tuple.encode()).unwrap(); - let msg = Message { payload, provider_msa_id: 10u64, msa_id: None }; - let expected = MessageResponse { + let msg = Message { payload, provider_msa_id: 10u64, msa_id: None, schema_id: 1 }; + let expected = MessageResponseV2 { provider_msa_id: DUMMY_MSA_ID, index: 1u16, block_number: 42, @@ -677,6 +593,7 @@ fn map_to_response_ipfs() { payload: None, cid: Some(cid.as_bytes().to_vec()), payload_length: Some(10), + schema_id: 1, }; - assert_eq!(msg.map_to_response((42, PayloadLocation::IPFS, 1)).unwrap(), expected); + assert_eq!(msg.map_to_response((42, 1, PayloadLocation::IPFS, 1)).unwrap(), expected); } diff --git a/pallets/messages/src/types.rs b/pallets/messages/src/types.rs index 5f3747c26e..92ea807337 100644 --- a/pallets/messages/src/types.rs +++ b/pallets/messages/src/types.rs @@ -23,25 +23,6 @@ pub type MessageIndex = u16; #[scale_info(skip_type_params(MaxDataSize))] #[codec(mel_bound(MaxDataSize: MaxEncodedLen))] pub struct Message -where - MaxDataSize: Get + Debug, -{ - /// Data structured by the associated schema's model. - pub payload: BoundedVec, - /// Message source account id of the Provider. This may be the same id as contained in `msa_id`, - /// indicating that the original source MSA is acting as its own provider. An id differing from that - /// of `msa_id` indicates that `provider_msa_id` was delegated by `msa_id` to send this message on - /// its behalf. - pub provider_msa_id: MessageSourceId, - /// Message source account id (the original source). - pub msa_id: Option, -} - -/// A single message type definition. -#[derive(Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] -#[scale_info(skip_type_params(MaxDataSize))] -#[codec(mel_bound(MaxDataSize: MaxEncodedLen))] -pub struct MessageV3 where MaxDataSize: Get + Debug, { @@ -144,7 +125,7 @@ impl + Debug> } impl + Debug> - MapToResponse<(BlockNumber, PayloadLocation, u16), MessageResponseV2> for MessageV3 + MapToResponse<(BlockNumber, PayloadLocation, u16), MessageResponseV2> for Message { /// Helper function to handle response type [`MessageResponseV2`] depending on the Payload Location (on chain or IPFS) fn map_to_response( diff --git a/pallets/messages/src/weights.rs b/pallets/messages/src/weights.rs index ce9a7e2604..f0a1e01ea4 100644 --- a/pallets/messages/src/weights.rs +++ b/pallets/messages/src/weights.rs @@ -35,6 +35,8 @@ use core::marker::PhantomData; pub trait WeightInfo { fn add_onchain_message(n: u32, ) -> Weight; fn add_ipfs_message() -> Weight; + fn v2_to_v3_step() -> Weight; + fn v2_to_v3_final_step() -> Weight; } /// Weights for `pallet_messages` using the Substrate node and recommended hardware. @@ -75,6 +77,29 @@ impl WeightInfo for SubstrateWeight { .saturating_add(T::DbWeight::get().reads(2_u64)) .saturating_add(T::DbWeight::get().writes(1_u64)) } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Storage: `Messages::MessagesV3` (r:0 w:1) + /// Proof: `Messages::MessagesV3` (`max_values`: None, `max_size`: Some(3125), added: 5600, mode: `MaxEncodedLen`) + fn v2_to_v3_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `3113` + // Estimated: `9548` + // Minimum execution time: 13_000_000 picoseconds. + Weight::from_parts(14_000_000, 9548) + .saturating_add(T::DbWeight::get().reads(2_u64)) + .saturating_add(T::DbWeight::get().writes(2_u64)) + } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + fn v2_to_v3_final_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `0` + // Estimated: `0` + // Minimum execution time: 1_000_000 picoseconds. + Weight::from_parts(2_000_000, 0) + .saturating_add(T::DbWeight::get().writes(1_u64)) + } } // For backwards compatibility and tests. @@ -114,6 +139,29 @@ impl WeightInfo for () { .saturating_add(RocksDbWeight::get().reads(2_u64)) .saturating_add(RocksDbWeight::get().writes(1_u64)) } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f963740dfb0edb77dddcbb1e5542cf4d2` (r:2 w:1) + /// Storage: `Messages::MessagesV3` (r:0 w:1) + /// Proof: `Messages::MessagesV3` (`max_values`: None, `max_size`: Some(3125), added: 5600, mode: `MaxEncodedLen`) + fn v2_to_v3_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `3113` + // Estimated: `9548` + // Minimum execution time: 13_000_000 picoseconds. + Weight::from_parts(14_000_000, 9548) + .saturating_add(RocksDbWeight::get().reads(2_u64)) + .saturating_add(RocksDbWeight::get().writes(2_u64)) + } + /// Storage: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + /// Proof: UNKNOWN KEY `0x9ea3e2d10fdb9a071f2f534d51b0961f4e7b9012096b41c4eb3aaf947f6ea429` (r:0 w:1) + fn v2_to_v3_final_step() -> Weight { + // Proof Size summary in bytes: + // Measured: `0` + // Estimated: `0` + // Minimum execution time: 1_000_000 picoseconds. + Weight::from_parts(2_000_000, 0) + .saturating_add(RocksDbWeight::get().writes(1_u64)) + } } diff --git a/runtime/frequency/Cargo.toml b/runtime/frequency/Cargo.toml index 509a646eef..bc38979898 100644 --- a/runtime/frequency/Cargo.toml +++ b/runtime/frequency/Cargo.toml @@ -39,6 +39,7 @@ pallet-democracy = { workspace = true } pallet-collective = { workspace = true } pallet-session = { workspace = true } pallet-sudo = { workspace = true } +pallet-migrations = { workspace = true } pallet-multisig = { workspace = true } pallet-timestamp = { workspace = true } pallet-transaction-payment = { workspace = true } @@ -142,6 +143,7 @@ std = [ "pallet-passkey/std", "pallet-messages-runtime-api/std", "pallet-messages/std", + "pallet-migrations/std", "pallet-msa-runtime-api/std", "pallet-msa/std", "pallet-multisig/std", @@ -212,6 +214,7 @@ runtime-benchmarks = [ "pallet-handles/runtime-benchmarks", "pallet-passkey/runtime-benchmarks", "pallet-messages/runtime-benchmarks", + "pallet-migrations/runtime-benchmarks", "pallet-msa/runtime-benchmarks", "pallet-multisig/runtime-benchmarks", "pallet-preimage/runtime-benchmarks", @@ -254,6 +257,7 @@ try-runtime = [ "pallet-handles/try-runtime", "pallet-passkey/try-runtime", "pallet-messages/try-runtime", + "pallet-migrations/try-runtime", "pallet-msa/try-runtime", "pallet-multisig/try-runtime", "pallet-preimage/try-runtime", diff --git a/runtime/frequency/src/lib.rs b/runtime/frequency/src/lib.rs index 7723ea8bb9..df4dc3bf0b 100644 --- a/runtime/frequency/src/lib.rs +++ b/runtime/frequency/src/lib.rs @@ -142,7 +142,7 @@ use frame_system::{ }; use alloc::{boxed::Box, vec, vec::Vec}; - +use core::ops::ControlFlow; pub use sp_consensus_aura::sr25519::AuthorityId as AuraId; pub use sp_runtime::Perbill; @@ -159,7 +159,11 @@ pub use pallet_time_release::types::{ScheduleName, SchedulerProviderTrait}; // Polkadot Imports use polkadot_runtime_common::{BlockHashCount, SlowAdjustingFeeUpdate}; -use common_primitives::{capacity::UnclaimedRewardInfo, schema::*}; +use common_primitives::{ + capacity::UnclaimedRewardInfo, + messages::{BlockPaginationRequest, BlockPaginationResponse, MessageResponseV2}, + schema::*, +}; use common_runtime::weights::rocksdb_weights::constants::RocksDbWeight; pub use common_runtime::{ constants::MaxSchemaGrants, @@ -781,7 +785,7 @@ impl frame_system::Config for Runtime { /// A new way of configuring migrations that run in a single block. type SingleBlockMigrations = (); /// The migrator that is used to run Multi-Block-Migrations. - type MultiBlockMigrator = (); + type MultiBlockMigrator = MultiBlockMigrations; /// A callback that executes in *every block* directly before all inherents were applied. type PreInherents = (); /// A callback that executes in *every block* directly after all inherents were applied. @@ -1643,6 +1647,34 @@ impl pallet_utility::Config for Runtime { type WeightInfo = weights::pallet_utility::SubstrateWeight; } +parameter_types! { + pub MbmServiceWeight: Weight = Perbill::from_percent(80) * RuntimeBlockWeights::get().max_block; +} + +impl pallet_migrations::Config for Runtime { + type RuntimeEvent = RuntimeEvent; + #[cfg(not(feature = "runtime-benchmarks"))] + type Migrations = ( + pallet_messages::migration::MigrateV2ToV3< + Runtime, + pallet_messages::weights::SubstrateWeight, + >, + pallet_messages::migration::FinalizeV3Migration< + Runtime, + pallet_messages::weights::SubstrateWeight, + >, + ); + // Benchmarks need mocked migrations to guarantee that they succeed. + #[cfg(feature = "runtime-benchmarks")] + type Migrations = pallet_migrations::mock_helpers::MockedMigrations; + type CursorMaxLen = ConstU32<65_536>; + type IdentifierMaxLen = ConstU32<256>; + type MigrationStatusHandler = (); + type FailedMigrationHandler = frame_support::migrations::FreezeChainOnFailedMigration; + type MaxServiceWeight = MbmServiceWeight; + type WeightInfo = pallet_migrations::weights::SubstrateWeight; +} + // Create the runtime by composing the FRAME pallets that were previously configured. construct_runtime!( pub enum Runtime { @@ -1696,6 +1728,9 @@ construct_runtime!( // Substrate weights WeightReclaim: cumulus_pallet_weight_reclaim::{Pallet, Storage} = 50, + // Multi-block migrations + MultiBlockMigrations: pallet_migrations::{Pallet, Event} = 51, + // Frequency related pallets Msa: pallet_msa::{Pallet, Call, Storage, Event} = 60, Messages: pallet_messages::{Pallet, Call, Storage, Event} = 61, @@ -1746,6 +1781,7 @@ mod benches { [pallet_transaction_payment, TransactionPayment] [cumulus_pallet_xcmp_queue, XcmpQueue] [pallet_message_queue, MessageQueue] + [pallet_migrations, MultiBlockMigrations] // Frequency [pallet_msa, Msa] @@ -1972,14 +2008,72 @@ sp_api::impl_runtime_apis! { } // Frequency runtime APIs + #[api_version(2)] impl pallet_messages_runtime_api::MessagesRuntimeApi for Runtime { fn get_messages_by_schema_and_block(schema_id: SchemaId, schema_payload_location: PayloadLocation, block_number: BlockNumber,) -> Vec { - Messages::get_messages_by_schema_and_block(schema_id, schema_payload_location, block_number) + match Schemas::get_schema_by_id(schema_id) { + Some(SchemaResponseV2 { intent_id, .. }) => Messages::get_messages_by_intent_and_block( + intent_id, + schema_payload_location, + block_number, + ).into_iter().map(|r| r.into()).collect(), + _ => vec![], + } } - fn get_schema_by_id(schema_id: SchemaId) -> Option { - Schemas::get_schema_by_id(schema_id) + /// Retrieve the messages for a particular intent and block range (paginated) + fn get_messages_by_intent_id(intent_id: IntentId, pagination: BlockPaginationRequest) -> BlockPaginationResponse { + let mut response = BlockPaginationResponse::new(); + + // Request Validation + if !pagination.validate() { + return response + } + + // Schema Fetch and Check + let intent = match Schemas::get_intent_by_id(intent_id, false) { + Some(intent) => intent, + None => return response, + }; + + let mut from_index: u32 = pagination.from_index; + + (pagination.from_block..pagination.to_block).try_for_each(|block_number| { + let list: Vec = Messages::get_messages_by_intent_and_block( + intent_id, + intent.payload_location, + block_number, + ); + + // Max messages in a block are constrained to MessageIndex (u16) by the storage, + // so this is a safe type coercion. Just to be safe, we'll trap in in debug builds + let list_size: u32 = list.len() as u32; + debug_assert!(list_size <= u16::MAX.into(), "unexpected number of messages in block"); + + let iter = list.into_iter().skip((from_index as usize).saturating_sub(1)); + // all subsequent blocks in this call should start at index 0 + from_index = 0; + iter.enumerate().try_for_each(|(i, m)| { + response.content.push(m); + + if response.check_end_condition_and_set_next_pagination( + block_number, + i as u32, + list_size, + &pagination, + ) { + return ControlFlow::Break(()) + } + + ControlFlow::Continue(()) + }) + }); + response + } + + fn get_schema_by_id(schema_id: SchemaId) -> Option { + Schemas::get_schema_by_id(schema_id).map(|r| r.into()) } } From df27cf9e1b185804a7531c57a399286f3367e107 Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Mon, 29 Sep 2025 15:32:46 -0400 Subject: [PATCH 3/7] unit tests for migrations --- Cargo.lock | 1 + pallets/messages/Cargo.toml | 1 + pallets/messages/src/migration/mod.rs | 3 + pallets/messages/src/migration/tests.rs | 71 ++++++++++++++++++++++++ pallets/messages/src/migration/v2/mod.rs | 2 +- pallets/messages/src/tests/mock.rs | 39 ++++++++++++- 6 files changed, 115 insertions(+), 2 deletions(-) create mode 100644 pallets/messages/src/migration/tests.rs diff --git a/Cargo.lock b/Cargo.lock index cf8f07a1d1..17c40b9c10 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7789,6 +7789,7 @@ dependencies = [ "frame-system", "log", "multibase", + "pallet-migrations", "parity-scale-codec", "pretty_assertions", "rand 0.9.1", diff --git a/pallets/messages/Cargo.toml b/pallets/messages/Cargo.toml index 54b77c0bf8..a0e9f91f10 100644 --- a/pallets/messages/Cargo.toml +++ b/pallets/messages/Cargo.toml @@ -32,6 +32,7 @@ multibase = { version = "0.9", default-features = false } common-runtime = { path = '../../runtime/common', default-features = false } # Testing dependencies rand = { workspace = true } +pallet-migrations = { workspace = true } pretty_assertions = { workspace = true } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } diff --git a/pallets/messages/src/migration/mod.rs b/pallets/messages/src/migration/mod.rs index 77b7d1f2e3..e61370570f 100644 --- a/pallets/messages/src/migration/mod.rs +++ b/pallets/messages/src/migration/mod.rs @@ -3,5 +3,8 @@ pub mod v2; /// Migration module for migrating from V2 to V3 pub mod v3; +#[cfg(all(test, not(feature = "runtime-benchmarks")))] + +mod tests; pub use v3::{FinalizeV3Migration, MigrateV2ToV3}; diff --git a/pallets/messages/src/migration/tests.rs b/pallets/messages/src/migration/tests.rs new file mode 100644 index 0000000000..6248c5f9cd --- /dev/null +++ b/pallets/messages/src/migration/tests.rs @@ -0,0 +1,71 @@ +use frame_support::BoundedVec; +use crate::{migration::{ + v2, +}, pallet, tests::mock::{ + new_test_ext, run_to_block_with_migrations, AllPalletsWithSystem, MigratorServiceWeight, Test as T, + System, +}, weights}; +use frame_support::{pallet_prelude::StorageVersion, traits::OnRuntimeUpgrade}; +use frame_system::pallet_prelude::BlockNumberFor; +use pallet_migrations::WeightInfo as _; + +#[test] +fn lazy_migration_works() { + new_test_ext().execute_with(|| { + const MESSAGE_COUNT: u16 = 500; + const ITEMS_PER_BLOCK: u16 = 16; + const BLOCKS_PER_RUN: u16 = MESSAGE_COUNT.div_ceil(ITEMS_PER_BLOCK); + frame_support::__private::sp_tracing::try_init_simple(); + // Insert some values into the old storage map. + let payload: BoundedVec::MessagesMaxPayloadSizeBytes> = + vec![1; 3072].try_into().expect("Unable to create BoundedVec payload"); + let old_message = v2::Message { + payload: payload.clone(), + provider_msa_id: 1u64, + msa_id: Some(1u64), + }; + let mut keys: Vec<(BlockNumberFor, u16, u16)> = Vec::new(); + for i in 0..MESSAGE_COUNT { + keys.push((BlockNumberFor::::default(), 0u16, i)); + v2::MessagesV2::::insert( + (BlockNumberFor::::default(), 0u16, i), + old_message.clone(), + ); + } + + // Give it enough weight do do exactly 16 iterations: + let limit = ::WeightInfo::progress_mbms_none() + + pallet_migrations::Pallet::::exec_migration_max_weight() + + as crate::weights::WeightInfo>::v2_to_v3_step() * ITEMS_PER_BLOCK as u64; + MigratorServiceWeight::set(&limit); + + StorageVersion::new(2).put::>(); + System::set_block_number(1); + AllPalletsWithSystem::on_runtime_upgrade(); // onboard MBMs + + let mut v2_expected: u16 = 500; + let mut v3_expected: u16 = 0; + for block in 2..=(BLOCKS_PER_RUN + 2) { + println!("Block: {}", block); + run_to_block_with_migrations(block as u32); + + v2_expected = v2_expected.saturating_sub(ITEMS_PER_BLOCK); + v3_expected += ITEMS_PER_BLOCK; + v3_expected = u16::min(v3_expected, MESSAGE_COUNT); + + let v2_count = v2::MessagesV2::::iter().count(); + let v3_count = crate::MessagesV3::::iter().count(); + assert_eq!(v2_expected as usize, v2_count); + assert_eq!(v3_expected as usize, v3_count); + } + + // Check that everything is in new storage now: + for i in keys.iter() { + assert!(crate::MessagesV3::::contains_key(i)); + } + + // Verify that the version is updated: + let current_version = StorageVersion::get::>(); + assert_eq!(current_version, 3); + }); +} diff --git a/pallets/messages/src/migration/v2/mod.rs b/pallets/messages/src/migration/v2/mod.rs index 4726ecf676..5c7b3f7550 100644 --- a/pallets/messages/src/migration/v2/mod.rs +++ b/pallets/messages/src/migration/v2/mod.rs @@ -21,7 +21,7 @@ pub(crate) type MessagesV2 = StorageNMap< >; /// A single message type definition for V2 storage -#[derive(Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] +#[derive(Clone, Default, Encode, Decode, PartialEq, Debug, TypeInfo, Eq, MaxEncodedLen)] #[scale_info(skip_type_params(MaxDataSize))] #[codec(mel_bound(MaxDataSize: MaxEncodedLen))] pub struct Message diff --git a/pallets/messages/src/tests/mock.rs b/pallets/messages/src/tests/mock.rs index 6e6315a2f5..784bb783df 100644 --- a/pallets/messages/src/tests/mock.rs +++ b/pallets/messages/src/tests/mock.rs @@ -1,3 +1,5 @@ +use frame_support::derive_impl; +use frame_support::pallet_prelude::Weight; use crate as pallet_messages; use common_primitives::{ msa::{ @@ -9,6 +11,7 @@ use common_primitives::{ use frame_support::{ dispatch::DispatchResult, + migrations::MultiStepMigrator, parameter_types, traits::{ConstU16, ConstU32, OnFinalize, OnInitialize}, }; @@ -43,6 +46,7 @@ frame_support::construct_runtime!( { System: frame_system::{Pallet, Call, Config, Storage, Event}, MessagesPallet: pallet_messages::{Pallet, Call, Storage, Event}, + Migrator: pallet_migrations, } ); @@ -72,13 +76,35 @@ impl system::Config for Test { type OnSetCode = (); type MaxConsumers = ConstU32<16>; type SingleBlockMigrations = (); - type MultiBlockMigrator = (); + type MultiBlockMigrator = Migrator; type PreInherents = (); type PostInherents = (); type PostTransactions = (); type ExtensionsWeightInfo = (); } +frame_support::parameter_types! { + pub storage MigratorServiceWeight: Weight = Weight::from_parts(100, 100); // do not use in prod +} + +#[derive_impl(pallet_migrations::config_preludes::TestDefaultConfig)] +impl pallet_migrations::Config for Test { + #[cfg(not(feature = "runtime-benchmarks"))] + type Migrations = ( + crate::migration::v3::MigrateV2ToV3< + Test, + crate::SubstrateWeight, + >, + crate::migration::v3::FinalizeV3Migration< + Test, + crate::SubstrateWeight, + >, + ); + #[cfg(feature = "runtime-benchmarks")] + type Migrations = pallet_migrations::mock_helpers::MockedMigrations; + type MaxServiceWeight = MigratorServiceWeight; +} + pub type MaxSchemaGrantsPerDelegation = ConstU32<30>; // Needs parameter_types! for the impls below @@ -273,6 +299,17 @@ pub fn run_to_block(n: u32) { } } +#[allow(dead_code)] +pub fn run_to_block_with_migrations(n: u32) { + System::run_to_block_with::( + n, + frame_system::RunToBlockHooks::default().after_initialize(|_| { + // Done by Executive: + ::MultiBlockMigrator::step(); + }), + ); +} + pub fn get_msa_from_account(account_id: u64) -> u64 { account_id + 100 } From 5bc7941e0214756e0a313cf9bb9af4086c9d7352 Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Mon, 29 Sep 2025 15:33:09 -0400 Subject: [PATCH 4/7] formatting --- pallets/messages/src/migration/mod.rs | 5 ++--- pallets/messages/src/migration/tests.rs | 28 ++++++++++++------------- pallets/messages/src/tests/mock.rs | 13 +++--------- 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/pallets/messages/src/migration/mod.rs b/pallets/messages/src/migration/mod.rs index e61370570f..82f8d8762a 100644 --- a/pallets/messages/src/migration/mod.rs +++ b/pallets/messages/src/migration/mod.rs @@ -1,10 +1,9 @@ /// Migration module for migrating from V2 to V3 pub mod v2; -/// Migration module for migrating from V2 to V3 -pub mod v3; #[cfg(all(test, not(feature = "runtime-benchmarks")))] - mod tests; +/// Migration module for migrating from V2 to V3 +pub mod v3; pub use v3::{FinalizeV3Migration, MigrateV2ToV3}; diff --git a/pallets/messages/src/migration/tests.rs b/pallets/messages/src/migration/tests.rs index 6248c5f9cd..031d72c7c3 100644 --- a/pallets/messages/src/migration/tests.rs +++ b/pallets/messages/src/migration/tests.rs @@ -1,11 +1,13 @@ -use frame_support::BoundedVec; -use crate::{migration::{ - v2, -}, pallet, tests::mock::{ - new_test_ext, run_to_block_with_migrations, AllPalletsWithSystem, MigratorServiceWeight, Test as T, - System, -}, weights}; -use frame_support::{pallet_prelude::StorageVersion, traits::OnRuntimeUpgrade}; +use crate::{ + migration::v2, + pallet, + tests::mock::{ + new_test_ext, run_to_block_with_migrations, AllPalletsWithSystem, MigratorServiceWeight, + System, Test as T, + }, + weights, +}; +use frame_support::{pallet_prelude::StorageVersion, traits::OnRuntimeUpgrade, BoundedVec}; use frame_system::pallet_prelude::BlockNumberFor; use pallet_migrations::WeightInfo as _; @@ -19,11 +21,8 @@ fn lazy_migration_works() { // Insert some values into the old storage map. let payload: BoundedVec::MessagesMaxPayloadSizeBytes> = vec![1; 3072].try_into().expect("Unable to create BoundedVec payload"); - let old_message = v2::Message { - payload: payload.clone(), - provider_msa_id: 1u64, - msa_id: Some(1u64), - }; + let old_message = + v2::Message { payload: payload.clone(), provider_msa_id: 1u64, msa_id: Some(1u64) }; let mut keys: Vec<(BlockNumberFor, u16, u16)> = Vec::new(); for i in 0..MESSAGE_COUNT { keys.push((BlockNumberFor::::default(), 0u16, i)); @@ -36,7 +35,8 @@ fn lazy_migration_works() { // Give it enough weight do do exactly 16 iterations: let limit = ::WeightInfo::progress_mbms_none() + pallet_migrations::Pallet::::exec_migration_max_weight() + - as crate::weights::WeightInfo>::v2_to_v3_step() * ITEMS_PER_BLOCK as u64; + as crate::weights::WeightInfo>::v2_to_v3_step() * + ITEMS_PER_BLOCK as u64; MigratorServiceWeight::set(&limit); StorageVersion::new(2).put::>(); diff --git a/pallets/messages/src/tests/mock.rs b/pallets/messages/src/tests/mock.rs index 784bb783df..d6fb1e782c 100644 --- a/pallets/messages/src/tests/mock.rs +++ b/pallets/messages/src/tests/mock.rs @@ -1,5 +1,3 @@ -use frame_support::derive_impl; -use frame_support::pallet_prelude::Weight; use crate as pallet_messages; use common_primitives::{ msa::{ @@ -8,6 +6,7 @@ use common_primitives::{ }, schema::*, }; +use frame_support::{derive_impl, pallet_prelude::Weight}; use frame_support::{ dispatch::DispatchResult, @@ -91,14 +90,8 @@ frame_support::parameter_types! { impl pallet_migrations::Config for Test { #[cfg(not(feature = "runtime-benchmarks"))] type Migrations = ( - crate::migration::v3::MigrateV2ToV3< - Test, - crate::SubstrateWeight, - >, - crate::migration::v3::FinalizeV3Migration< - Test, - crate::SubstrateWeight, - >, + crate::migration::v3::MigrateV2ToV3>, + crate::migration::v3::FinalizeV3Migration>, ); #[cfg(feature = "runtime-benchmarks")] type Migrations = pallet_migrations::mock_helpers::MockedMigrations; From af595db59ff7bbd60be6b94295e24e7c6223b5cb Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Tue, 30 Sep 2025 15:57:56 -0400 Subject: [PATCH 5/7] fix: allow Blake3 hash for CIDv1 --- pallets/messages/src/lib.rs | 3 +- pallets/messages/src/migration/tests.rs | 1 - pallets/messages/src/tests/other_tests.rs | 71 ++++++++++++++++++++++- pallets/messages/src/types.rs | 5 ++ 4 files changed, 75 insertions(+), 5 deletions(-) diff --git a/pallets/messages/src/lib.rs b/pallets/messages/src/lib.rs index 0eac173e4a..113e974e2c 100644 --- a/pallets/messages/src/lib.rs +++ b/pallets/messages/src/lib.rs @@ -390,7 +390,8 @@ impl Pallet { // Assume it's a multibase-encoded string. Decode it to a byte array so we can parse the CID. let cid_b = multibase::decode(cid_str).map_err(|_| Error::::InvalidCid)?.1; - ensure!(Cid::read_bytes(&cid_b[..]).is_ok(), Error::::InvalidCid); + let cid = Cid::read_bytes(&cid_b[..]).map_err(|_| Error::::InvalidCid)?; + ensure!([SHA2_256, BLAKE3].contains(&cid.hash().code()), Error::::InvalidCid); Ok(cid_b) } diff --git a/pallets/messages/src/migration/tests.rs b/pallets/messages/src/migration/tests.rs index 031d72c7c3..4c33f2f826 100644 --- a/pallets/messages/src/migration/tests.rs +++ b/pallets/messages/src/migration/tests.rs @@ -46,7 +46,6 @@ fn lazy_migration_works() { let mut v2_expected: u16 = 500; let mut v3_expected: u16 = 0; for block in 2..=(BLOCKS_PER_RUN + 2) { - println!("Block: {}", block); run_to_block_with_migrations(block as u32); v2_expected = v2_expected.saturating_sub(ITEMS_PER_BLOCK); diff --git a/pallets/messages/src/tests/other_tests.rs b/pallets/messages/src/tests/other_tests.rs index 531d8f9bac..d05c8497ae 100644 --- a/pallets/messages/src/tests/other_tests.rs +++ b/pallets/messages/src/tests/other_tests.rs @@ -15,6 +15,8 @@ use pretty_assertions::{assert_eq, assert_ne, assert_str_eq}; use sp_core::ConstU32; pub const DUMMY_CID_SHA512: &str = "bafkrgqb76pscorjihsk77zpyst3p364zlti6aojlu4nga34vhp7t5orzwbwwytvp7ej44r5yhjzneanqwb5arcnvuvfwo2d4qgzyx5hymvto4"; +pub const DUMMY_CID_SHA256: &str = "bagaaierasords4njcts6vs7qvdjfcvgnume4hqohf65zsfguprqphs3icwea"; +pub const DUMMY_CID_BLAKE3: &str = "bafkr4ihn4xalcdzoyslzy2nvf5q6il7vwqjvdhhatpqpctijrxh6l5xzru"; /// Populate mocked Messages storage with message data. /// @@ -402,6 +404,52 @@ fn add_onchain_message_with_invalid_payload_location_should_fail() { }); } +#[test] +fn add_ipfs_message_with_unsupported_cid_hash_should_fail() { + new_test_ext().execute_with(|| { + let caller_1 = 5u64; + + assert_noop!( + MessagesPallet::add_ipfs_message( + RuntimeOrigin::signed(caller_1), + IPFS_SCHEMA_ID, + DUMMY_CID_SHA512.as_bytes().to_vec(), + 15 + ), + Error::::InvalidCid + ); + }) +} + +#[test] +fn add_ipfs_message_with_sha2_256_cid_should_succeed() { + new_test_ext().execute_with(|| { + let caller_1 = 5u64; + + assert_ok!( + MessagesPallet::add_ipfs_message( + RuntimeOrigin::signed(caller_1), + IPFS_SCHEMA_ID, + DUMMY_CID_SHA256.as_bytes().to_vec(), + 15 + )); + }) +} + +#[test] +fn add_ipfs_message_with_blake3_cid_should_succeed() { + new_test_ext().execute_with(|| { + let caller_1 = 5u64; + + assert_ok!( + MessagesPallet::add_ipfs_message( + RuntimeOrigin::signed(caller_1), + IPFS_SCHEMA_ID, + DUMMY_CID_BLAKE3.as_bytes().to_vec(), + 15 + )); + }) +} #[test] fn add_ipfs_message_cid_v0_errors() { new_test_ext().execute_with(|| { @@ -514,14 +562,31 @@ fn validate_cid_invalid_cid_errors() { } #[test] -fn validate_cid_valid_cid_succeeds() { +fn validate_cid_valid_cid_sha2_256_succeeds() { new_test_ext().execute_with(|| { - let bad_cid = DUMMY_CID_BASE32.to_vec(); + let cid = DUMMY_CID_BASE32.to_vec(); - assert_ok!(MessagesPallet::validate_cid(&bad_cid)); + assert_ok!(MessagesPallet::validate_cid(&cid)); }) } +#[test] +fn validate_cid_valid_cid_blake3_succeeds() { + new_test_ext().execute_with(|| { + let cid = DUMMY_CID_BLAKE3.as_bytes().to_vec(); + + assert_ok!(MessagesPallet::validate_cid(&cid)); + }) +} + +#[test] +fn validate_cid_invalid_hash_function_errors() { + new_test_ext().execute_with(|| { + let bad_cid = DUMMY_CID_SHA512.as_bytes().to_vec(); + + assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); + }) +} #[test] fn validate_cid_not_utf8_aligned_errors() { new_test_ext().execute_with(|| { diff --git a/pallets/messages/src/types.rs b/pallets/messages/src/types.rs index 92ea807337..96e42674ce 100644 --- a/pallets/messages/src/types.rs +++ b/pallets/messages/src/types.rs @@ -13,6 +13,11 @@ extern crate alloc; use alloc::vec::Vec; use common_primitives::messages::MessageResponseV2; +/// Multihash hash function code for SHA2-256 +pub const SHA2_256: u64 = 0x12; +/// Multihash hash function code for Blake3 +pub const BLAKE3: u64 = 0x1e; + /// Payloads stored offchain contain a tuple of (bytes(the payload reference), payload length). pub type OffchainPayloadType = (Vec, u32); /// Index of message in the block From 30ef1d0bf1340562b22e5ff7181eaa3f97c818c4 Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Tue, 30 Sep 2025 16:23:50 -0400 Subject: [PATCH 6/7] format and README --- pallets/messages/README.md | 44 ++++++++++++++--------- pallets/messages/src/tests/other_tests.rs | 26 +++++++------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/pallets/messages/README.md b/pallets/messages/README.md index 46c490b694..35ad45b1c7 100644 --- a/pallets/messages/README.md +++ b/pallets/messages/README.md @@ -4,16 +4,16 @@ Stores block-indexed data for a Schema using `OnChain` or `IPFS` payload locatio ## Summary -Messages stores metadata and message payload data for a set Schema. -Message payloads are meant for streaming data, where when the message was sent is the focus. +`messages` stores metadata and message payload data for a set Schema. +Message payloads are meant for streaming data, where _when_ the message was sent is the focus. Discovery is via the `MessagesInBlock` on new blocks or requesting messages from a block. -Retrieval is via RPC. +Retrieval is via a custom runtime API. ### Metadata vs Payload Messages have both metadata and payloads. The payload should always match the data structure or the message is considered invalid. -The metadata is the Block Number, Schema Id, and other data useful for discovering and organizing the payload information. +The metadata is the Block Number, Intent Id, and other data useful for discovering and organizing the payload information. ### Payload Options @@ -22,35 +22,36 @@ The metadata is the Block Number, Schema Id, and other data useful for discoveri ### Message Ordering -Messages are ordered by block number, and within each block, they follow a specific order based on their transaction sequence within that block. +Messages are ordered by block number and IntentId, and within each block, they follow a specific order based on their transaction sequence within that block. This order is immutable. ### Actions The Messages pallet provides for: -- Adding messages for a given schema -- Enabling the retrieval of messages for a given schema +- Adding messages for a given Intent +- Enabling the retrieval of messages for a given Intent ## Interactions ### Extrinsics | Name/Description | Caller | Payment | Key Events | Runtime Added | -| ---------------------------------------------------------------------------------------- | -------- | ------------------ | --------------------------------------------------------------------------------------------------------------------------------- | ------------- | +|------------------------------------------------------------------------------------------|----------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------|---------------| | `add_ipfs_message`
Add a message to a Schema with an `IPFS` payload location | Provider | Capacity or Tokens | [`MessagesInBlock`](https://frequency-chain.github.io/frequency/pallet_messages/pallet/enum.Event.html#variant.MessagesInBlock)\* | 1 | | `add_onchain_message`
Add a message to a Schema with an `ON_CHAIN` payload location | Provider | Capacity or Tokens | [`MessagesInBlock`](https://frequency-chain.github.io/frequency/pallet_messages/pallet/enum.Event.html#variant.MessagesInBlock)\* | 1 | -\* The `MessagesInBlock` may not occur more than once per block and does _not_ indicate which schema received messages. +\* The `MessagesInBlock` may occur at most once per block and does _not_ indicate which Intent(s) received messages. See [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages/pallet/struct.Pallet.html) for more details. ### State Queries -| Name | Description | Query | Runtime Added | -| --------------- | ----------------------------------------------------------------------------------------------------------------------------- | ------------ | ------------- | -| Get Messages v2 | _Suggested_: Use RPC instead of this storage directly. Storage for the messages by Block number, Schema Id, and Message Index | `messagesV2` | 61 | -| Get Messages v1 | Removed in Runtime 60 | `messages` | 1-60 | +| Name | Description | Query | Runtime Added | +|------------|-------------------------------------------------------------------------------------------------------------------------------------------------------|--------------|---------------| +| MessagesV3 | Suggested: Use custom runtime API instead of querying this storage directly.
Storage for the messages by Block Number, IntentId, and MessageIndex | `messagesV3` | ? | +| MessagesV2 | Removed in Runtime ?? | `messagesV2` | 61 | +| Messages | Removed in Runtime 60 | `messages` | 1-60 | See the [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages/pallet/storage_types/index.html) for additional state queries and details. @@ -58,8 +59,19 @@ See the [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages/ Note: May be restricted based on node settings and configuration. -| Name | Description | Call | Node Version | -| ------------------------- | ------------------------------------------------------------------------------------------------ | -------------------------------------------------------------------------------------------------------------------------------------------------- | ------------ | -| Get Messages by Schema Id | Fetch paginated messages for a specific Schema Id in the given block range for a given Schema Id | [`getBySchemaId`](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html#tymethod.get_messages_by_schema_id) | v1.0.0+ | +| Name | Description | Call | Node Version | +|---------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|--------------| +| Get Messages by Schema Id | Fetch paginated messages for a specific Schema Id in the given block range for a given Schema Id
Deprecated in `v1.17.?`. Use custome Runtime API `get_messages_by_intent_id` instead | [`getBySchemaId`](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html#tymethod.get_messages_by_schema_id) | v1.0.0+ | See [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html) for more details. + +### Runtime API + +| Name | Description | Call | API Version Added | Runtime Added | +|-------------------------------------------------|---------------------------------------------------------------------------|-------------------------------|-------------------|---------------| +| Get Schema by Id _(deprecated)_ | Retrieves the schema for the given Schema Id | `getBySchemaId` | 1 | 1 | +| Get Messages by Schema and Block _(deprecated)_ | Retrieve the messages for a particular schema and block number | `getMessagesBySchemaAndBlock` | 1 | 1 | +| Get Messages by Intent and Block | Retrieve the messages for a particular intent and block range (paginated) | `getMessagesByIntentId` | 2 | ? | + +See [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages_runtime_api/trait.MessagesRuntimeApi.html) for +more details. diff --git a/pallets/messages/src/tests/other_tests.rs b/pallets/messages/src/tests/other_tests.rs index d05c8497ae..8d50a30d1e 100644 --- a/pallets/messages/src/tests/other_tests.rs +++ b/pallets/messages/src/tests/other_tests.rs @@ -426,13 +426,12 @@ fn add_ipfs_message_with_sha2_256_cid_should_succeed() { new_test_ext().execute_with(|| { let caller_1 = 5u64; - assert_ok!( - MessagesPallet::add_ipfs_message( - RuntimeOrigin::signed(caller_1), - IPFS_SCHEMA_ID, - DUMMY_CID_SHA256.as_bytes().to_vec(), - 15 - )); + assert_ok!(MessagesPallet::add_ipfs_message( + RuntimeOrigin::signed(caller_1), + IPFS_SCHEMA_ID, + DUMMY_CID_SHA256.as_bytes().to_vec(), + 15 + )); }) } @@ -441,13 +440,12 @@ fn add_ipfs_message_with_blake3_cid_should_succeed() { new_test_ext().execute_with(|| { let caller_1 = 5u64; - assert_ok!( - MessagesPallet::add_ipfs_message( - RuntimeOrigin::signed(caller_1), - IPFS_SCHEMA_ID, - DUMMY_CID_BLAKE3.as_bytes().to_vec(), - 15 - )); + assert_ok!(MessagesPallet::add_ipfs_message( + RuntimeOrigin::signed(caller_1), + IPFS_SCHEMA_ID, + DUMMY_CID_BLAKE3.as_bytes().to_vec(), + 15 + )); }) } #[test] From 9d7e6a17291a5f69fab3dc504f06acc40750e3f7 Mon Sep 17 00:00:00 2001 From: Joe Caputo Date: Thu, 2 Oct 2025 13:25:50 -0400 Subject: [PATCH 7/7] PR comments --- Cargo.lock | 4 +- Cargo.toml | 2 + common/primitives/Cargo.toml | 1 + common/primitives/src/cid.rs | 158 ++++++++++++++++++++++ common/primitives/src/lib.rs | 2 + pallets/messages/Cargo.toml | 5 +- pallets/messages/README.md | 6 +- pallets/messages/src/lib.rs | 18 +-- pallets/messages/src/tests/other_tests.rs | 105 -------------- tools/state-copy/README.md | 7 +- tools/state-copy/schemas.mjs | 10 +- 11 files changed, 182 insertions(+), 136 deletions(-) create mode 100644 common/primitives/src/cid.rs diff --git a/Cargo.lock b/Cargo.lock index 17c40b9c10..19fad8f209 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1561,6 +1561,7 @@ checksum = "2382f75942f4b3be3690fe4f86365e9c853c1587d6ee58212cebf6e2a9ccd101" name = "common-primitives" version = "0.0.0" dependencies = [ + "cid 0.11.1", "enumflags2", "frame-support", "frame-system", @@ -7781,7 +7782,6 @@ dependencies = [ name = "pallet-messages" version = "0.0.0" dependencies = [ - "cid 0.11.1", "common-primitives", "common-runtime", "frame-benchmarking", @@ -7792,10 +7792,8 @@ dependencies = [ "pallet-migrations", "parity-scale-codec", "pretty_assertions", - "rand 0.9.1", "scale-info", "serde", - "serde_json", "sp-core", "sp-io", "sp-runtime", diff --git a/Cargo.toml b/Cargo.toml index a85854bc78..0db2406d54 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,6 +20,8 @@ apache-avro = { version = "0.19.0", default-features = false } rand = "0.9.0" parking_lot = "0.12.1" lazy_static = { version = "1.5", features = ["spin_no_std"] } +multibase = { version = "0.9", default-features = false } +cid = { version = "0.11", default-features = false, features = ["alloc"] } # substrate wasm parity-scale-codec = { version = "3.7.4", default-features = false } diff --git a/common/primitives/Cargo.toml b/common/primitives/Cargo.toml index ed3c025610..f7ad44fa79 100644 --- a/common/primitives/Cargo.toml +++ b/common/primitives/Cargo.toml @@ -30,6 +30,7 @@ sp-runtime-interface = { workspace = true } libsecp256k1 = { workspace = true, features = ["hmac"] } log = "0.4.22" lazy_static = { workspace = true } +cid = { workspace = true } [features] default = ['std'] diff --git a/common/primitives/src/cid.rs b/common/primitives/src/cid.rs new file mode 100644 index 0000000000..ea8dbf12d7 --- /dev/null +++ b/common/primitives/src/cid.rs @@ -0,0 +1,158 @@ +#[cfg(test)] +use cid::multibase::Base; +use cid::{multibase, Cid}; +#[cfg(test)] +use frame_support::assert_ok; +use frame_support::ensure; +use sp_io::hashing::sha2_256; +use sp_runtime::Vec; + +/// Multihash type for wrapping digests (support up to 64-byte digests) +type Multihash = cid::multihash::Multihash<64>; + +/// SHA2-256 multihash code +const SHA2_256: u64 = 0x12; +/// BLAKE3 multihash code +const BLAKE3: u64 = 0x1e; + +/// List of hash algorithms supported by DSNP +const DSNP_HASH_ALGORITHMS: &[u64] = &[SHA2_256, BLAKE3]; + +/// Raw codec for CIDv1 (0x55) +const RAW: u64 = 0x55; + +/// Error enum for CID validation +#[derive(Debug, PartialEq)] +pub enum CidError { + /// Unsupported CID version + UnsupportedCidVersion, + /// Unsupported CID hash algorithm + UnsupportedCidMultihash, + /// Multibase decoding error + MultibaseDecodeError, + /// UTF-8 decoding error + Utf8DecodeError, + /// CID parsing error + InvalidCid, +} + +/// Computes a CIDv1 (RAW + SHA2-256 multihash) +pub fn compute_cid_v1(bytes: &[u8]) -> Option> { + let digest = sha2_256(bytes); + let mh = Multihash::wrap(SHA2_256, &digest).ok()?; + let cid = Cid::new_v1(RAW, mh); + Some(cid.to_bytes()) +} + +/// Validates a CID to conform to IPFS CIDv1 (or higher) formatting and allowed multihashes (does not validate decoded CID fields) +pub fn validate_cid(in_cid: &[u8]) -> Result, CidError> { + // Decode SCALE encoded CID into string slice + let cid_str: &str = core::str::from_utf8(in_cid).map_err(|_| CidError::Utf8DecodeError)?; + ensure!(cid_str.len() > 2, CidError::InvalidCid); + // starts_with handles Unicode multibyte characters safely + ensure!(!cid_str.starts_with("Qm"), CidError::UnsupportedCidVersion); + + // Assume it's a multibase-encoded string. Decode it to a byte array so we can parse the CID. + let cid_b = multibase::decode(cid_str).map_err(|_| CidError::MultibaseDecodeError)?.1; + let cid = Cid::read_bytes(&cid_b[..]).map_err(|_| CidError::InvalidCid)?; + ensure!(DSNP_HASH_ALGORITHMS.contains(&cid.hash().code()), CidError::UnsupportedCidMultihash); + + Ok(cid_b) +} + +#[cfg(test)] +const DUMMY_CID_SHA512: &str = "bafkrgqb76pscorjihsk77zpyst3p364zlti6aojlu4nga34vhp7t5orzwbwwytvp7ej44r5yhjzneanqwb5arcnvuvfwo2d4qgzyx5hymvto4"; +#[cfg(test)] +const DUMMY_CID_SHA256: &str = "bagaaierasords4njcts6vs7qvdjfcvgnume4hqohf65zsfguprqphs3icwea"; +#[cfg(test)] +const DUMMY_CID_BLAKE3: &str = "bafkr4ihn4xalcdzoyslzy2nvf5q6il7vwqjvdhhatpqpctijrxh6l5xzru"; + +#[test] +fn validate_cid_invalid_utf8_errors() { + let bad_cid = vec![0xfc, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1]; + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected Utf8DecodeError"), + CidError::Utf8DecodeError + ); +} + +#[test] +fn validate_cid_too_short_errors() { + let bad_cid = "a".as_bytes().to_vec(); + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); +} + +#[test] +fn validate_cid_v0_errors() { + let bad_cid = "Qmxxx".as_bytes().to_vec(); + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected UnsupportedCidVersion"), + CidError::UnsupportedCidVersion + ); +} + +#[test] +fn validate_cid_invalid_multibase_errors() { + let bad_cid = "aaaa".as_bytes().to_vec(); + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected MultibaseDecodeError"), + CidError::MultibaseDecodeError + ); +} + +#[test] +fn validate_cid_invalid_cid_errors() { + let bad_cid = multibase::encode(Base::Base32Lower, "foo").as_bytes().to_vec(); + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); +} + +#[test] +fn validate_cid_valid_cid_sha2_256_succeeds() { + let cid = DUMMY_CID_SHA256.as_bytes().to_vec(); + assert_ok!(validate_cid(&cid)); +} + +#[test] +fn validate_cid_valid_cid_blake3_succeeds() { + let cid = DUMMY_CID_BLAKE3.as_bytes().to_vec(); + assert_ok!(validate_cid(&cid)); +} + +#[test] +fn validate_cid_invalid_hash_function_errors() { + let bad_cid = DUMMY_CID_SHA512.as_bytes().to_vec(); + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected UnsupportedCidMultihash"), + CidError::UnsupportedCidMultihash + ); +} +#[test] +fn validate_cid_not_valid_multibase() { + // This should not panic, but should return an error. + let bad_cid = vec![55, 197, 136, 0, 0, 0, 0, 0, 0, 0, 0]; + assert_eq!( + validate_cid(&bad_cid).expect_err("Expected MultibaseDecodeError"), + CidError::MultibaseDecodeError + ); +} + +#[test] +fn validate_cid_not_correct_format_errors() { + // This should not panic, but should return an error. + let bad_cid = vec![0, 1, 0, 1, 203, 155, 0, 0, 0, 5, 67]; + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); + + // This should not panic, but should return an error. + let another_bad_cid = vec![241, 0, 0, 0, 0, 0, 128, 132, 132, 132, 58]; + assert_eq!( + validate_cid(&another_bad_cid).expect_err("Expected Utf8DecodeError"), + CidError::Utf8DecodeError + ); +} + +#[test] +fn validate_cid_unwrap_errors() { + // This should not panic, but should return an error. + let bad_cid = vec![102, 70, 70, 70, 70, 70, 70, 70, 70, 48, 48, 48, 54, 53, 53, 48, 48]; + assert_eq!(validate_cid(&bad_cid).expect_err("Expected InvalidCid"), CidError::InvalidCid); +} diff --git a/common/primitives/src/lib.rs b/common/primitives/src/lib.rs index 1db06f6efa..65f8531e42 100644 --- a/common/primitives/src/lib.rs +++ b/common/primitives/src/lib.rs @@ -42,5 +42,7 @@ pub mod offchain; /// Benchmarking helper trait pub mod benchmarks; +/// CID support +pub mod cid; /// Signature support for ethereum pub mod signatures; diff --git a/pallets/messages/Cargo.toml b/pallets/messages/Cargo.toml index a0e9f91f10..d615d90d51 100644 --- a/pallets/messages/Cargo.toml +++ b/pallets/messages/Cargo.toml @@ -25,17 +25,14 @@ sp-io = { workspace = true } sp-runtime = { workspace = true } # Frequency related dependencies common-primitives = { default-features = false, path = "../../common/primitives" } -cid = { version = "0.11", default-features = false } -multibase = { version = "0.9", default-features = false } +multibase = { workspace = true } [dev-dependencies] common-runtime = { path = '../../runtime/common', default-features = false } # Testing dependencies -rand = { workspace = true } pallet-migrations = { workspace = true } pretty_assertions = { workspace = true } serde = { workspace = true, features = ["derive"] } -serde_json = { workspace = true } [features] default = ['std'] diff --git a/pallets/messages/README.md b/pallets/messages/README.md index 35ad45b1c7..dd5f3cad9c 100644 --- a/pallets/messages/README.md +++ b/pallets/messages/README.md @@ -59,9 +59,9 @@ See the [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages/ Note: May be restricted based on node settings and configuration. -| Name | Description | Call | Node Version | -|---------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|--------------| -| Get Messages by Schema Id | Fetch paginated messages for a specific Schema Id in the given block range for a given Schema Id
Deprecated in `v1.17.?`. Use custome Runtime API `get_messages_by_intent_id` instead | [`getBySchemaId`](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html#tymethod.get_messages_by_schema_id) | v1.0.0+ | +| Name | Description | Call | Node Version | +|------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------|--------------| +| Get Messages by Schema Id _(deprecated)_ | Fetch paginated messages for a specific Schema Id in the given block range for a given Schema Id
Deprecated in `v1.17.?`. Use custom Runtime API `get_messages_by_intent_id` instead | [`getBySchemaId`](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html#tymethod.get_messages_by_schema_id) | v1.0.0+ | See [Rust Docs](https://frequency-chain.github.io/frequency/pallet_messages_rpc/trait.MessagesApiServer.html) for more details. diff --git a/pallets/messages/src/lib.rs b/pallets/messages/src/lib.rs index 113e974e2c..cbbfd8353a 100644 --- a/pallets/messages/src/lib.rs +++ b/pallets/messages/src/lib.rs @@ -41,6 +41,7 @@ extern crate core; use alloc::vec::Vec; use common_primitives::{ + cid::*, messages::*, msa::{ DelegatorId, MessageSourceId, MsaLookup, MsaValidator, ProviderId, SchemaGrantValidator, @@ -57,7 +58,6 @@ pub use pallet::*; pub use types::*; pub use weights::*; -use cid::Cid; use common_primitives::node::BlockNumber; use frame_system::pallet_prelude::*; @@ -382,17 +382,9 @@ impl Pallet { /// * [`Error::InvalidCid`] - Unable to parse provided CID /// pub fn validate_cid(in_cid: &[u8]) -> Result, DispatchError> { - // Decode SCALE encoded CID into string slice - let cid_str: &str = core::str::from_utf8(in_cid).map_err(|_| Error::::InvalidCid)?; - ensure!(cid_str.len() > 2, Error::::InvalidCid); - // starts_with handles Unicode multibyte characters safely - ensure!(!cid_str.starts_with("Qm"), Error::::UnsupportedCidVersion); - - // Assume it's a multibase-encoded string. Decode it to a byte array so we can parse the CID. - let cid_b = multibase::decode(cid_str).map_err(|_| Error::::InvalidCid)?.1; - let cid = Cid::read_bytes(&cid_b[..]).map_err(|_| Error::::InvalidCid)?; - ensure!([SHA2_256, BLAKE3].contains(&cid.hash().code()), Error::::InvalidCid); - - Ok(cid_b) + Ok(validate_cid(in_cid).map_err(|e| match e { + CidError::UnsupportedCidVersion => Error::::UnsupportedCidVersion, + _ => Error::::InvalidCid, + })?) } } diff --git a/pallets/messages/src/tests/other_tests.rs b/pallets/messages/src/tests/other_tests.rs index 8d50a30d1e..1fa3331068 100644 --- a/pallets/messages/src/tests/other_tests.rs +++ b/pallets/messages/src/tests/other_tests.rs @@ -514,111 +514,6 @@ fn on_initialize_should_clean_up_temporary_storage() { }); } -#[test] -fn validate_cid_invalid_utf8_errors() { - new_test_ext().execute_with(|| { - let bad_cid = vec![0xfc, 0xa1, 0xa1, 0xa1, 0xa1, 0xa1]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_too_short_errors() { - new_test_ext().execute_with(|| { - let bad_cid = "a".as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_v0_errors() { - new_test_ext().execute_with(|| { - let bad_cid = "Qmxxx".as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::UnsupportedCidVersion); - }) -} - -#[test] -fn validate_cid_invalid_multibase_errors() { - new_test_ext().execute_with(|| { - let bad_cid = "aaaa".as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_invalid_cid_errors() { - new_test_ext().execute_with(|| { - let bad_cid = multibase::encode(Base::Base32Lower, "foo").as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_valid_cid_sha2_256_succeeds() { - new_test_ext().execute_with(|| { - let cid = DUMMY_CID_BASE32.to_vec(); - - assert_ok!(MessagesPallet::validate_cid(&cid)); - }) -} - -#[test] -fn validate_cid_valid_cid_blake3_succeeds() { - new_test_ext().execute_with(|| { - let cid = DUMMY_CID_BLAKE3.as_bytes().to_vec(); - - assert_ok!(MessagesPallet::validate_cid(&cid)); - }) -} - -#[test] -fn validate_cid_invalid_hash_function_errors() { - new_test_ext().execute_with(|| { - let bad_cid = DUMMY_CID_SHA512.as_bytes().to_vec(); - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} -#[test] -fn validate_cid_not_utf8_aligned_errors() { - new_test_ext().execute_with(|| { - // This should not panic, but should return an error. - let bad_cid = vec![55, 197, 136, 0, 0, 0, 0, 0, 0, 0, 0]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_not_correct_format_errors() { - new_test_ext().execute_with(|| { - // This should not panic, but should return an error. - let bad_cid = vec![0, 1, 0, 1, 203, 155, 0, 0, 0, 5, 67]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - // This should not panic, but should return an error. - let another_bad_cid = vec![241, 0, 0, 0, 0, 0, 128, 132, 132, 132, 58]; - - assert_noop!(MessagesPallet::validate_cid(&another_bad_cid), Error::::InvalidCid); - }) -} - -#[test] -fn validate_cid_unwrap_errors() { - new_test_ext().execute_with(|| { - // This should not panic, but should return an error. - let bad_cid = vec![102, 70, 70, 70, 70, 70, 70, 70, 70, 48, 48, 48, 54, 53, 53, 48, 48]; - - assert_noop!(MessagesPallet::validate_cid(&bad_cid), Error::::InvalidCid); - }) -} - #[test] fn map_to_response_on_chain() { let payload_vec = b"123456789012345678901234567890".to_vec(); diff --git a/tools/state-copy/README.md b/tools/state-copy/README.md index 5d614ed03d..f233dd2ac3 100644 --- a/tools/state-copy/README.md +++ b/tools/state-copy/README.md @@ -8,8 +8,7 @@ To maintain alignment with Mainnet, when a new schema is deployed on Mainnet, Te 1. In the Frequency codebase: `cd tools/state-copy` 2. `npm i` -3. Edit `schemas.mjs` - - Use the Testnet `DEST_URL`: `const DEST_URL = "wss://0.rpc.testnet.amplica.io";` - - Update the`const SUDO_URI = "//Alice";` to be the SUDO key for Testnet +3. Set the following environment variables (or run with the `env` command): + - `DEST_URL=wss://0.rpc.testnet.amplica.io` + - `SUDO_URI=` 4. `npm run schemas` -5. Remove the changes diff --git a/tools/state-copy/schemas.mjs b/tools/state-copy/schemas.mjs index 2b26956943..dff1a1adc3 100644 --- a/tools/state-copy/schemas.mjs +++ b/tools/state-copy/schemas.mjs @@ -5,18 +5,20 @@ import { copy } from "./copy.mjs"; // Set up sudo account (assumes //Alice has sudo access) -const SUDO_URI = "//Alice"; +const SUDO_URI = process.env.SUDO_URI || "//Alice"; // Testnet // const DEST_URL = "wss://0.rpc.testnet.amplica.io"; // const SOURCE_URL = "wss://0.rpc.testnet.amplica.io"; // Localhost -const DEST_URL = "ws://localhost:9944"; +// const DEST_URL = "ws://localhost:9944"; + +const DEST_URL = process.env.DEST_URL || "ws://localhost:9944"; // Mainnet -const SOURCE_URL = "wss://1.rpc.frequency.xyz"; -const STORAGE_KEY = "0xeec6f3c13d26ae2507c99b6751e19e76"; +const SOURCE_URL = process.env.SOURCE_URL || "wss://1.rpc.frequency.xyz"; +const STORAGE_KEY = process.env.STORAGE_KEY || "0xeec6f3c13d26ae2507c99b6751e19e76"; const FILTER_OUT = [ "0xeec6f3c13d26ae2507c99b6751e19e76d5d9c370c6c8aee1116ee09d6811b0d5", // governanceSchemaModelMaxBytes "0xeec6f3c13d26ae2507c99b6751e19e764e7b9012096b41c4eb3aaf947f6ea429", // palletVersion