Skip to content

Commit

Permalink
Add events to BlockExecutionOutcome.
Browse files Browse the repository at this point in the history
  • Loading branch information
afck committed Jul 15, 2024
1 parent 91ded01 commit 48bc1a7
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 9 deletions.
1 change: 1 addition & 0 deletions linera-base/src/identifiers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -832,6 +832,7 @@ doc_scalar!(
ChainDescription."
);
doc_scalar!(ChannelName, "The name of a subscription channel");
doc_scalar!(StreamName, "The name of an event stream");
bcs_scalar!(MessageId, "The index of a message in a chain");
doc_scalar!(
Owner,
Expand Down
30 changes: 24 additions & 6 deletions linera-chain/src/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use {linera_base::identifiers::BytecodeId, linera_execution::BytecodeLocation};

use crate::{
data_types::{
Block, BlockExecutionOutcome, ChainAndHeight, ChannelFullName, Event, IncomingMessage,
MessageAction, MessageBundle, Origin, OutgoingMessage, Target,
Block, BlockExecutionOutcome, ChainAndHeight, ChannelFullName, Event, EventRecord,
IncomingMessage, MessageAction, MessageBundle, Origin, OutgoingMessage, Target,
},
inbox::{Cursor, InboxError, InboxStateView},
manager::ChainManager,
Expand Down Expand Up @@ -803,6 +803,7 @@ where
);
let mut oracle_responses = oracle_responses.map(Vec::into_iter);
let mut new_oracle_responses = Vec::new();
let mut events = Vec::new();
let mut next_message_index = 0;
for (index, message) in block.incoming_messages.iter().enumerate() {
#[cfg(with_metrics)]
Expand Down Expand Up @@ -910,7 +911,7 @@ where
}
};
new_oracle_responses.push(oracle_responses);
let messages_out = self
let (messages_out, new_events) = self
.process_execution_outcomes(context.height, outcomes)
.await?;
if let MessageAction::Accept = message.action {
Expand All @@ -925,6 +926,7 @@ where
next_message_index +=
u32::try_from(messages_out.len()).map_err(|_| ArithmeticError::Overflow)?;
messages.push(messages_out);
events.push(new_events);
}
// Second, execute the operations in the block and remember the recipients to notify.
for (index, operation) in block.operations.iter().enumerate() {
Expand Down Expand Up @@ -959,7 +961,7 @@ where
.await
.map_err(|err| ChainError::ExecutionError(err, chain_execution_context))?;
new_oracle_responses.push(oracle_responses);
let messages_out = self
let (messages_out, new_events) = self
.process_execution_outcomes(context.height, outcomes)
.await?;
resource_controller
Expand All @@ -977,6 +979,7 @@ where
next_message_index +=
u32::try_from(messages_out.len()).map_err(|_| ArithmeticError::Overflow)?;
messages.push(messages_out);
events.push(new_events);
}

// Finally, charge for the block fee, except if the chain is closed. Closed chains should
Expand Down Expand Up @@ -1031,22 +1034,25 @@ where
messages,
state_hash,
oracle_responses: new_oracle_responses,
events,
})
}

async fn process_execution_outcomes(
&mut self,
height: BlockHeight,
results: Vec<ExecutionOutcome>,
) -> Result<Vec<OutgoingMessage>, ChainError> {
) -> Result<(Vec<OutgoingMessage>, Vec<EventRecord>), ChainError> {
let mut messages = Vec::new();
let mut events = Vec::new();
for result in results {
match result {
ExecutionOutcome::System(result) => {
self.process_raw_execution_outcome(
GenericApplicationId::System,
Message::System,
&mut messages,
&mut events,
height,
result,
)
Expand All @@ -1060,27 +1066,39 @@ where
bytes,
},
&mut messages,
&mut events,
height,
result,
)
.await?;
}
}
}
Ok(messages)
Ok((messages, events))
}

async fn process_raw_execution_outcome<E, F>(
&mut self,
application_id: GenericApplicationId,
lift: F,
messages: &mut Vec<OutgoingMessage>,
events: &mut Vec<EventRecord>,
height: BlockHeight,
raw_outcome: RawExecutionOutcome<E, Amount>,
) -> Result<(), ChainError>
where
F: Fn(E) -> Message,
{
events.extend(
raw_outcome
.events
.into_iter()
.map(|(stream_name, payload)| EventRecord {
application_id,
stream_name,
payload,
}),
);
let max_stream_queries = self.context().max_stream_queries();
// Record the messages of the execution. Messages are understood within an
// application.
Expand Down
15 changes: 15 additions & 0 deletions linera-chain/src/data_types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use linera_base::{
doc_scalar, ensure,
identifiers::{
Account, BlobId, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner,
StreamName,
},
};
use linera_execution::{
Expand Down Expand Up @@ -272,9 +273,23 @@ pub struct ExecutedBlock {
pub struct BlockExecutionOutcome {
/// The list of outgoing messages for each transaction.
pub messages: Vec<Vec<OutgoingMessage>>,
/// The hash of the chain's execution state after this block.
pub state_hash: CryptoHash,
/// The record of oracle responses for each transaction.
pub oracle_responses: Vec<Vec<OracleResponse>>,
/// The list of events produced by each transaction.
pub events: Vec<Vec<EventRecord>>,
}

/// An event recorded in an executed block.
#[derive(Debug, PartialEq, Eq, Hash, Clone, Serialize, Deserialize, SimpleObject)]
pub struct EventRecord {
/// The application that emitted this event.
pub application_id: GenericApplicationId,
/// The name of the event stream.
pub stream_name: StreamName,
/// The payload data.
pub payload: Vec<u8>,
}

/// A statement to be certified by the validators.
Expand Down
2 changes: 2 additions & 0 deletions linera-chain/src/unit_tests/data_types_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ fn test_signed_values() {
messages: vec![Vec::new()],
state_hash: CryptoHash::test_hash("state"),
oracle_responses: vec![Vec::new()],
events: vec![Vec::new()],
}
.with(block);
let value = HashedCertificateValue::new_confirmed(executed_block);
Expand Down Expand Up @@ -47,6 +48,7 @@ fn test_certificates() {
messages: vec![Vec::new()],
state_hash: CryptoHash::test_hash("state"),
oracle_responses: vec![Vec::new()],
events: vec![Vec::new()],
}
.with(block);
let value = HashedCertificateValue::new_confirmed(executed_block);
Expand Down
7 changes: 7 additions & 0 deletions linera-core/src/unit_tests/wasm_worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ where
kind: MessageKind::Protected,
message: Message::System(publish_message.clone()),
}]],
events: vec![Vec::new()],
state_hash: publisher_state_hash,
oracle_responses: vec![Vec::new()],
}
Expand Down Expand Up @@ -216,6 +217,7 @@ where
let failing_broadcast_block_proposal = HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![vec![failing_broadcast_outgoing_message]],
events: vec![Vec::new()],
state_hash: publisher_state_hash,
oracle_responses: vec![Vec::new()],
}
Expand All @@ -240,6 +242,7 @@ where
let broadcast_block_proposal = HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![vec![broadcast_outgoing_message]],
events: vec![Vec::new()],
state_hash: publisher_state_hash,
oracle_responses: vec![Vec::new()],
}
Expand Down Expand Up @@ -294,6 +297,7 @@ where
kind: MessageKind::Protected,
message: Message::System(subscribe_message.clone()),
}]],
events: vec![Vec::new()],
state_hash: creator_state.crypto_hash().await?,
oracle_responses: vec![Vec::new()],
}
Expand Down Expand Up @@ -346,6 +350,7 @@ where
id: creator_chain.into(),
}),
}]],
events: vec![Vec::new()],
state_hash: publisher_state_hash,
oracle_responses: vec![Vec::new()],
}
Expand Down Expand Up @@ -443,6 +448,7 @@ where
message: Message::System(SystemMessage::ApplicationCreated),
}],
],
events: vec![Vec::new()],
state_hash: creator_state.crypto_hash().await?,
oracle_responses: vec![Vec::new(); 2],
}
Expand Down Expand Up @@ -496,6 +502,7 @@ where
let run_block_proposal = HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![Vec::new()],
events: vec![Vec::new()],
state_hash: creator_state.crypto_hash().await?,
oracle_responses: vec![Vec::new()],
}
Expand Down
20 changes: 17 additions & 3 deletions linera-core/src/unit_tests/worker_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,13 +296,14 @@ where
}
Recipient::Burn => messages.push(Vec::new()),
}
let oracle_responses = iter::repeat_with(Vec::new)
.take(block.operations.len() + block.incoming_messages.len())
.collect();
let tx_count = block.operations.len() + block.incoming_messages.len();
let oracle_responses = iter::repeat_with(Vec::new).take(tx_count).collect();
let events = iter::repeat_with(Vec::new).take(tx_count).collect();
let state_hash = system_state.into_hash().await;
let value = HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages,
events,
state_hash,
oracle_responses,
}
Expand Down Expand Up @@ -708,6 +709,7 @@ where
Amount::from_tokens(2),
)],
],
events: vec![Vec::new(); 3],
state_hash: SystemExecutionState {
committees: [(epoch, committee.clone())].into_iter().collect(),
ownership: ChainOwnership::single(sender_key_pair.public()),
Expand Down Expand Up @@ -735,6 +737,7 @@ where
ChainId::root(2),
Amount::from_tokens(3),
)]],
events: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: [(epoch, committee.clone())].into_iter().collect(),
ownership: ChainOwnership::single(sender_key_pair.public()),
Expand Down Expand Up @@ -997,6 +1000,7 @@ where
Vec::new(),
vec![direct_credit_message(ChainId::root(3), Amount::ONE)],
],
events: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
committees: [(epoch, committee.clone())].into_iter().collect(),
ownership: ChainOwnership::single(recipient_key_pair.public()),
Expand Down Expand Up @@ -1286,6 +1290,7 @@ where
let value = HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![Vec::new()],
events: vec![Vec::new()],
state_hash: state.into_hash().await,
oracle_responses: vec![Vec::new()],
}
Expand Down Expand Up @@ -2326,6 +2331,7 @@ where
},
),
]],
events: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
committees: committees.clone(),
ownership: ChainOwnership::single(key_pair.public()),
Expand Down Expand Up @@ -2389,6 +2395,7 @@ where
})],
vec![direct_credit_message(user_id, Amount::from_tokens(2))],
],
events: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
// The root chain knows both committees at the end.
committees: committees2.clone(),
Expand Down Expand Up @@ -2424,6 +2431,7 @@ where
MessageKind::Protected,
SystemMessage::Notify { id: user_id },
)]],
events: vec![Vec::new()],
state_hash: SystemExecutionState {
// The root chain knows both committees at the end.
committees: committees2.clone(),
Expand Down Expand Up @@ -2548,6 +2556,7 @@ where
HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![Vec::new(); 4],
events: vec![Vec::new(); 4],
state_hash: SystemExecutionState {
subscriptions: [ChannelSubscription {
chain_id: admin_id,
Expand Down Expand Up @@ -2726,6 +2735,7 @@ where
HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![vec![direct_credit_message(admin_id, Amount::ONE)]],
events: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: committees.clone(),
ownership: ChainOwnership::single(key_pair1.public()),
Expand Down Expand Up @@ -2753,6 +2763,7 @@ where
epoch: Epoch::from(1),
committees: committees2.clone(),
})]],
events: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: committees2.clone(),
ownership: ChainOwnership::single(key_pair0.public()),
Expand Down Expand Up @@ -2854,6 +2865,7 @@ where
HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![vec![direct_credit_message(admin_id, Amount::ONE)]],
events: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: committees.clone(),
ownership: ChainOwnership::single(key_pair1.public()),
Expand Down Expand Up @@ -2888,6 +2900,7 @@ where
committees: committees3.clone(),
})],
],
events: vec![Vec::new(); 2],
state_hash: SystemExecutionState {
committees: committees3.clone(),
ownership: ChainOwnership::single(key_pair0.public()),
Expand Down Expand Up @@ -2948,6 +2961,7 @@ where
HashedCertificateValue::new_confirmed(
BlockExecutionOutcome {
messages: vec![Vec::new()],
events: vec![Vec::new()],
state_hash: SystemExecutionState {
committees: committees3.clone(),
ownership: ChainOwnership::single(key_pair0.public()),
Expand Down
14 changes: 14 additions & 0 deletions linera-rpc/tests/snapshots/format__format.yaml.snap
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ BlockExecutionOutcome:
SEQ:
SEQ:
TYPENAME: OracleResponse
- events:
SEQ:
SEQ:
TYPENAME: EventRecord
BlockHeight:
NEWTYPESTRUCT: U64
BlockHeightRange:
Expand Down Expand Up @@ -393,6 +397,14 @@ Event:
TYPENAME: Timestamp
- message:
TYPENAME: Message
EventRecord:
STRUCT:
- application_id:
TYPENAME: GenericApplicationId
- stream_name:
TYPENAME: StreamName
- payload:
SEQ: U8
ExecutedBlock:
STRUCT:
- block:
Expand Down Expand Up @@ -807,6 +819,8 @@ Signature:
TUPLEARRAY:
CONTENT: U8
SIZE: 64
StreamName:
NEWTYPESTRUCT: BYTES
SystemChannel:
ENUM:
0:
Expand Down
Loading

0 comments on commit 48bc1a7

Please sign in to comment.