Skip to content

Commit

Permalink
refactor(core): commit family of syscalls does not keep state between…
Browse files Browse the repository at this point in the history
… executions (#4304)
  • Loading branch information
playX18 authored Nov 13, 2024
1 parent 0790732 commit 7f91d5b
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 164 deletions.
117 changes: 39 additions & 78 deletions core/src/message/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,34 +141,41 @@ impl ContextOutcome {
}
}
}
/// Store of current temporary message execution context.
#[derive(Clone, Default, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Decode, Encode, TypeInfo)]
pub struct OutgoingPayloads {
handles: BTreeMap<u32, Option<Payload>>,
reply: Option<Payload>,
bytes_counter: u32,
}

/// Store of previous message execution context.
#[derive(Clone, Default, Debug, Eq, Hash, Ord, PartialEq, PartialOrd, Decode, Encode, TypeInfo)]
#[cfg_attr(feature = "std", derive(serde::Serialize, serde::Deserialize))]
pub struct ContextStore {
outgoing: BTreeMap<u32, Option<Payload>>,
reply: Option<Payload>,
initialized: BTreeSet<ProgramId>,
reservation_nonce: ReservationNonce,
system_reservation: Option<u64>,
/// Used to prevent creating messages with the same ID in DB. Before this was achieved by using `outgoing.len()`
/// but now it is moved to [OutgoingPayloads] thus we need to keep nonce here. Now to calculate nonce we simple increment `local_nonce`
/// in each `init` call.
local_nonce: u32,
}

impl ContextStore {
// TODO: Remove, only used in migrations (#issue 3721)
/// Create a new context store with the provided parameters.
pub fn new(
outgoing: BTreeMap<u32, Option<Payload>>,
reply: Option<Payload>,
initialized: BTreeSet<ProgramId>,
reservation_nonce: ReservationNonce,
system_reservation: Option<u64>,
local_nonce: u32,
) -> Self {
Self {
outgoing,
reply,
initialized,
reservation_nonce,
system_reservation,
local_nonce,
}
}

Expand Down Expand Up @@ -208,8 +215,8 @@ pub struct MessageContext {
current: IncomingMessage,
outcome: ContextOutcome,
store: ContextStore,
outgoing_payloads: OutgoingPayloads,
settings: ContextSettings,
outgoing_bytes_counter: u32,
}

impl MessageContext {
Expand All @@ -222,29 +229,13 @@ impl MessageContext {
) -> Option<Self> {
let (kind, message, store) = dispatch.into_parts();

let outgoing_bytes_counter = match &store {
Some(store) => {
let mut counter = 0u32;
for payload in store.outgoing.values().filter_map(|x| x.as_ref()) {
counter = counter.checked_add(payload.len_u32())?;
}
counter
}
None => 0,
};

if outgoing_bytes_counter > settings.outgoing_bytes_limit {
// Outgoing messages bytes limit exceeded.
return None;
}

Some(Self {
kind,
outcome: ContextOutcome::new(program_id, message.source(), message.id()),
current: message,
store: store.unwrap_or_default(),
outgoing_payloads: OutgoingPayloads::default(),
settings,
outgoing_bytes_counter,
})
}

Expand Down Expand Up @@ -288,16 +279,16 @@ impl MessageContext {
return Err(Error::DuplicateInit);
}

let last = self.store.outgoing.len() as u32;
let last = self.store.local_nonce;

if last >= self.settings.outgoing_limit {
return Err(Error::OutgoingMessagesAmountLimitExceeded);
}

let message_id = MessageId::generate_outgoing(self.current.id(), last);
let message = InitMessage::from_packet(message_id, packet);

self.store.outgoing.insert(last, None);
self.store.local_nonce += 1;
self.outgoing_payloads.handles.insert(last, None);
self.store.initialized.insert(program_id);
self.outcome.init.push((message, delay, None));

Expand All @@ -316,15 +307,15 @@ impl MessageContext {
reservation: Option<ReservationId>,
) -> Result<MessageId, Error> {
let outgoing = self
.store
.outgoing
.outgoing_payloads
.handles
.get_mut(&handle)
.ok_or(Error::OutOfBounds)?;
let data = outgoing.take().ok_or(Error::LateAccess)?;

let do_send_commit = || {
let Some(new_outgoing_bytes) = Self::increase_counter(
self.outgoing_bytes_counter,
self.outgoing_payloads.bytes_counter,
packet.payload_len(),
self.settings.outgoing_bytes_limit,
) else {
Expand All @@ -347,7 +338,7 @@ impl MessageContext {
// store outgoing messages (see `Self::new`),
// so committed during this execution messages won't be taken into account
// during next executions.
self.outgoing_bytes_counter = new_outgoing_bytes;
self.outgoing_payloads.bytes_counter = new_outgoing_bytes;

Ok(message_id)
};
Expand All @@ -362,10 +353,12 @@ impl MessageContext {
///
/// Returns it's handle.
pub fn send_init(&mut self) -> Result<u32, Error> {
let last = self.store.outgoing.len() as u32;

let last = self.store.local_nonce;
if last < self.settings.outgoing_limit {
self.store.outgoing.insert(last, Some(Default::default()));
self.store.local_nonce += 1;
self.outgoing_payloads
.handles
.insert(last, Some(Default::default()));

Ok(last)
} else {
Expand All @@ -375,14 +368,14 @@ impl MessageContext {

/// Pushes payload into stored payload by handle.
pub fn send_push(&mut self, handle: u32, buffer: &[u8]) -> Result<(), Error> {
let data = match self.store.outgoing.get_mut(&handle) {
let data = match self.outgoing_payloads.handles.get_mut(&handle) {
Some(Some(data)) => data,
Some(None) => return Err(Error::LateAccess),
None => return Err(Error::OutOfBounds),
};

let new_outgoing_bytes = Self::increase_counter(
self.outgoing_bytes_counter,
self.outgoing_payloads.bytes_counter,
buffer.len(),
self.settings.outgoing_bytes_limit,
)
Expand All @@ -391,14 +384,14 @@ impl MessageContext {
data.try_extend_from_slice(buffer)
.map_err(|_| Error::MaxMessageSizeExceed)?;

self.outgoing_bytes_counter = new_outgoing_bytes;
self.outgoing_payloads.bytes_counter = new_outgoing_bytes;

Ok(())
}

/// Pushes the incoming buffer/payload into stored payload by handle.
pub fn send_push_input(&mut self, handle: u32, range: CheckedRange) -> Result<(), Error> {
let data = match self.store.outgoing.get_mut(&handle) {
let data = match self.outgoing_payloads.handles.get_mut(&handle) {
Some(Some(data)) => data,
Some(None) => return Err(Error::LateAccess),
None => return Err(Error::OutOfBounds),
Expand All @@ -411,7 +404,7 @@ impl MessageContext {
} = range;

let new_outgoing_bytes = Self::increase_counter(
self.outgoing_bytes_counter,
self.outgoing_payloads.bytes_counter,
bytes_amount,
self.settings.outgoing_bytes_limit,
)
Expand All @@ -420,7 +413,7 @@ impl MessageContext {
data.try_extend_from_slice(&self.current.payload_bytes()[offset..excluded_end])
.map_err(|_| Error::MaxMessageSizeExceed)?;

self.outgoing_bytes_counter = new_outgoing_bytes;
self.outgoing_payloads.bytes_counter = new_outgoing_bytes;

Ok(())
}
Expand Down Expand Up @@ -467,10 +460,10 @@ impl MessageContext {
return Err(Error::DuplicateReply.into());
}

let data = self.store.reply.take().unwrap_or_default();
let data = self.outgoing_payloads.reply.take().unwrap_or_default();

if let Err(data) = packet.try_prepend(data) {
self.store.reply = Some(data);
self.outgoing_payloads.reply = Some(data);
return Err(Error::MaxMessageSizeExceed.into());
}

Expand All @@ -491,7 +484,7 @@ impl MessageContext {
}

// NOTE: it's normal to not undone `get_or_insert_with` in case of error
self.store
self.outgoing_payloads
.reply
.get_or_insert_with(Default::default)
.try_extend_from_slice(buffer)
Expand All @@ -517,7 +510,7 @@ impl MessageContext {
} = range;

// NOTE: it's normal to not undone `get_or_insert_with` in case of error
self.store
self.outgoing_payloads
.reply
.get_or_insert_with(Default::default)
.try_extend_from_slice(&self.current.payload_bytes()[offset..excluded_end])
Expand Down Expand Up @@ -816,36 +809,6 @@ mod tests {
);
}

#[test]
fn create_wrong_context() {
let context_store = ContextStore {
outgoing: [(1, Some(vec![1, 2].try_into().unwrap()))]
.iter()
.cloned()
.collect(),
reply: None,
initialized: BTreeSet::new(),
reservation_nonce: ReservationNonce::default(),
system_reservation: None,
};

let incoming_dispatch = IncomingDispatch::new(
DispatchKind::Handle,
Default::default(),
Some(context_store),
);

let ctx = MessageContext::new(
incoming_dispatch,
Default::default(),
ContextSettings::with_outgoing_limits(1024, 1),
);

// Creating a message context must return None,
// because of the outgoing messages bytes limit exceeded.
assert!(ctx.is_none(), "Expect None, got {:?}", ctx);
}

#[test]
fn outgoing_limit_exceeded() {
// Check that we can always send exactly outgoing_limit messages.
Expand Down Expand Up @@ -979,8 +942,6 @@ mod tests {

// Checking that the initial parameters of the context match the passed constants
assert_eq!(context.current().id(), MessageId::from(INCOMING_MESSAGE_ID));
assert!(context.store.reply.is_none());
assert!(context.outcome.reply.is_none());

// Creating a reply packet
let reply_packet = ReplyPacket::new(vec![0, 0].try_into().unwrap(), 0);
Expand Down Expand Up @@ -1038,8 +999,8 @@ mod tests {

// And checking that it is not formed
assert!(context
.store
.outgoing
.outgoing_payloads
.handles
.get(&expected_handle)
.expect("This key should be")
.is_some());
Expand Down
16 changes: 1 addition & 15 deletions gsdk/src/metadata/generated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,27 +780,13 @@ pub mod runtime_types {
Debug, crate::gp::Decode, crate::gp::DecodeAsType, crate::gp::Encode,
)]
pub struct ContextStore {
pub outgoing: ::subxt::ext::subxt_core::utils::KeyedVec<
::core::primitive::u32,
::core::option::Option<
runtime_types::gear_core::buffer::LimitedVec<
::core::primitive::u8,
runtime_types::gear_core::message::PayloadSizeError,
>,
>,
>,
pub reply: ::core::option::Option<
runtime_types::gear_core::buffer::LimitedVec<
::core::primitive::u8,
runtime_types::gear_core::message::PayloadSizeError,
>,
>,
pub initialized: ::subxt::ext::subxt_core::alloc::vec::Vec<
runtime_types::gprimitives::ActorId,
>,
pub reservation_nonce:
runtime_types::gear_core::reservation::ReservationNonce,
pub system_reservation: ::core::option::Option<::core::primitive::u64>,
pub local_nonce: ::core::primitive::u32,
}
}
pub mod stored {
Expand Down
3 changes: 2 additions & 1 deletion pallets/gear-messenger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ mod mock;
#[cfg(test)]
mod tests;

pub mod migrations;
pub mod pallet_tests;

// Public exports from pallet.
Expand All @@ -166,7 +167,7 @@ pub mod pallet {
use sp_std::{convert::TryInto, marker::PhantomData};

/// The current storage version.
pub(crate) const MESSENGER_STORAGE_VERSION: StorageVersion = StorageVersion::new(3);
pub(crate) const MESSENGER_STORAGE_VERSION: StorageVersion = StorageVersion::new(4);

// Gear Messenger Pallet's `Config`.
#[pallet::config]
Expand Down
Loading

0 comments on commit 7f91d5b

Please sign in to comment.