diff --git a/applications/tari_indexer/src/event_scanner.rs b/applications/tari_indexer/src/event_scanner.rs index 19aa28ed2..b56b1b8c5 100644 --- a/applications/tari_indexer/src/event_scanner.rs +++ b/applications/tari_indexer/src/event_scanner.rs @@ -29,7 +29,7 @@ use tari_bor::decode; use tari_crypto::tari_utilities::message_format::MessageFormat; use tari_dan_common_types::{committee::Committee, Epoch, PeerAddress, ShardGroup}; use tari_dan_p2p::proto::rpc::{GetTransactionResultRequest, PayloadResultStatus, SyncBlocksRequest}; -use tari_dan_storage::consensus_models::{Block, BlockId, Decision, TransactionRecord}; +use tari_dan_storage::consensus_models::{Block, BlockId, Decision}; use tari_engine_types::{ commit_result::{ExecuteResult, TransactionResult}, events::Event, @@ -37,7 +37,7 @@ use tari_engine_types::{ }; use tari_epoch_manager::{base_layer::EpochManagerHandle, EpochManagerReader}; use tari_template_lib::models::{EntityId, TemplateAddress}; -use tari_transaction::{Transaction, TransactionId}; +use tari_transaction::TransactionId; use tari_validator_node_rpc::client::{TariValidatorNodeRpcClientFactory, ValidatorNodeClientFactory}; use crate::{ @@ -563,6 +563,10 @@ impl EventScanner { .sync_blocks(SyncBlocksRequest { start_block_id: start_block_id.map(|id| id.as_bytes().to_vec()).unwrap_or_default(), epoch: Some(up_to_epoch.into()), + // TODO: QC should be validated + stream_qcs: false, + stream_substates: false, + stream_transactions: false, }) .await?; while let Some(resp) = stream.next().await { @@ -573,38 +577,6 @@ impl EventScanner { .ok_or_else(|| anyhow::anyhow!("Expected peer to return a newblock"))?; let block = Block::try_from(new_block)?; - let Some(_) = stream.next().await else { - anyhow::bail!("Peer closed session before sending QC message") - }; - - let Some(resp) = stream.next().await else { - anyhow::bail!("Peer closed session before sending substate update count message") - }; - let msg = resp?; - let num_substates = - msg.substate_count() - .ok_or_else(|| anyhow::anyhow!("Expected peer to return substate count"))? as usize; - - for _ in 0..num_substates { - let Some(_) = stream.next().await else { - anyhow::bail!("Peer closed session before sending substate updates message") - }; - } - - let Some(resp) = stream.next().await else { - anyhow::bail!("Peer closed session before sending transactions message") - }; - let msg = resp?; - let transactions = msg - .into_transactions() - .ok_or_else(|| anyhow::anyhow!("Expected peer to return transactions"))?; - - let _transactions = transactions - .into_iter() - .map(Transaction::try_from) - .map(|r| r.map(TransactionRecord::new)) - .collect::, _>>()?; - blocks.push(block); } diff --git a/applications/tari_indexer_web_ui/package-lock.json b/applications/tari_indexer_web_ui/package-lock.json index ffa288889..0d93bbada 100644 --- a/applications/tari_indexer_web_ui/package-lock.json +++ b/applications/tari_indexer_web_ui/package-lock.json @@ -37,7 +37,7 @@ }, "../../bindings": { "name": "@tari-project/typescript-bindings", - "version": "1.3.2", + "version": "1.4.0", "license": "ISC", "devDependencies": { "shx": "^0.3.4", diff --git a/applications/tari_validator_node/src/json_rpc/handlers.rs b/applications/tari_validator_node/src/json_rpc/handlers.rs index 9df4ea4cc..fec4144c3 100644 --- a/applications/tari_validator_node/src/json_rpc/handlers.rs +++ b/applications/tari_validator_node/src/json_rpc/handlers.rs @@ -231,9 +231,22 @@ impl JsonRpcHandlers { let tx = self.state_store.create_read_tx().unwrap(); match SubstateRecord::get(&tx, &request.address).optional() { - Ok(Some(state)) => Ok(JsonRpcResponse::success(answer_id, GetStateResponse { - data: state.into_substate().to_bytes(), - })), + Ok(Some(state)) => { + let Some(substate) = state.into_substate() else { + return Err(JsonRpcResponse::error( + answer_id, + JsonRpcError::new( + JsonRpcErrorReason::ApplicationError(100), + format!("Substate {} is DOWN", request.address), + json::Value::Null, + ), + )); + }; + + Ok(JsonRpcResponse::success(answer_id, GetStateResponse { + data: substate.to_bytes(), + })) + }, Ok(None) => Err(JsonRpcResponse::error( answer_id, JsonRpcError::new( @@ -372,7 +385,7 @@ impl JsonRpcHandlers { Some(substate) => Ok(JsonRpcResponse::success(answer_id, GetSubstateResponse { status: SubstateStatus::Up, created_by_tx: Some(substate.created_by_transaction), - value: Some(substate.into_substate_value()), + value: substate.into_substate_value(), })), None => Ok(JsonRpcResponse::success(answer_id, GetSubstateResponse { status: SubstateStatus::DoesNotExist, diff --git a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs index fd43199dd..3e03a4059 100644 --- a/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs +++ b/applications/tari_validator_node/src/p2p/rpc/block_sync_task.rs @@ -5,7 +5,10 @@ use std::collections::HashSet; use log::*; use tari_dan_common_types::{optional::Optional, Epoch}; -use tari_dan_p2p::proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse, Transactions}; +use tari_dan_p2p::{ + proto, + proto::rpc::{sync_blocks_response::SyncData, QuorumCertificates, SyncBlocksResponse}, +}; use tari_dan_storage::{ consensus_models::{Block, BlockId, QuorumCertificate, SubstateUpdate, TransactionRecord}, StateStore, @@ -19,12 +22,12 @@ const LOG_TARGET: &str = "tari::dan::rpc::sync_task"; const BLOCK_BUFFER_SIZE: usize = 15; -type BlockData = ( - Block, - Vec, - Vec, - Vec, -); +struct BlockData { + block: Block, + qcs: Vec, + substates: Vec, + transactions: Vec, +} type BlockBuffer = Vec; pub struct BlockSyncTask { @@ -49,12 +52,12 @@ impl BlockSyncTask { } } - pub async fn run(mut self) -> Result<(), ()> { + pub async fn run(mut self, req: proto::rpc::SyncBlocksRequest) -> Result<(), ()> { let mut buffer = Vec::with_capacity(BLOCK_BUFFER_SIZE); let mut current_block_id = self.start_block_id; let mut counter = 0; loop { - match self.fetch_next_batch(&mut buffer, ¤t_block_id) { + match self.fetch_next_batch(&mut buffer, ¤t_block_id, &req) { Ok(last_block) => { current_block_id = last_block; }, @@ -74,7 +77,7 @@ impl BlockSyncTask { counter += buffer.len(); for data in buffer.drain(..) { - self.send_block_data(data).await?; + self.send_block_data(&req, data).await?; } // If we didn't fill up the buffer, send the final blocks @@ -99,13 +102,18 @@ impl BlockSyncTask { ); for data in buffer.drain(..) { - self.send_block_data(data).await?; + self.send_block_data(&req, data).await?; } Ok(()) } - fn fetch_next_batch(&self, buffer: &mut BlockBuffer, current_block_id: &BlockId) -> Result { + fn fetch_next_batch( + &self, + buffer: &mut BlockBuffer, + current_block_id: &BlockId, + req: &proto::rpc::SyncBlocksRequest, + ) -> Result { self.store.with_read_tx(|tx| { let mut current_block_id = *current_block_id; let mut last_block_id = current_block_id; @@ -143,17 +151,35 @@ impl BlockSyncTask { } last_block_id = current_block_id; - let all_qcs = child - .commands() - .iter() - .filter_map(|cmd| cmd.transaction()) - .flat_map(|transaction| transaction.evidence.qc_ids_iter()) - .collect::>(); + let all_qcs = req + .stream_qcs + .then(|| { + child + .commands() + .iter() + .filter_map(|cmd| cmd.transaction()) + .flat_map(|transaction| transaction.evidence.qc_ids_iter()) + .collect::>() + }) + .unwrap_or_default(); let certificates = QuorumCertificate::get_all(tx, all_qcs)?; - let updates = child.get_substate_updates(tx)?; - let transactions = child.get_transactions(tx)?; + let updates = req + .stream_substates + .then(|| child.get_substate_updates(tx)) + .transpose()? + .unwrap_or_default(); + let transactions = req + .stream_transactions + .then(|| child.get_transactions(tx)) + .transpose()? + .unwrap_or_default(); - buffer.push((child, certificates, updates, transactions)); + buffer.push(BlockData { + block: child, + qcs: certificates, + substates: updates, + transactions, + }); if buffer.len() == buffer.capacity() { break; } @@ -226,43 +252,69 @@ impl BlockSyncTask { Ok(()) } - async fn send_block_data(&mut self, (block, qcs, updates, transactions): BlockData) -> Result<(), ()> { + async fn send_block_data(&mut self, req: &proto::rpc::SyncBlocksRequest, data: BlockData) -> Result<(), ()> { + let BlockData { + block, + qcs, + substates: updates, + transactions, + } = data; self.send(Ok(SyncBlocksResponse { sync_data: Some(SyncData::Block((&block).into())), })) .await?; - self.send(Ok(SyncBlocksResponse { - sync_data: Some(SyncData::QuorumCertificates(QuorumCertificates { - quorum_certificates: qcs.iter().map(Into::into).collect(), - })), - })) - .await?; - match u32::try_from(updates.len()) { - Ok(count) => { - self.send(Ok(SyncBlocksResponse { - sync_data: Some(SyncData::SubstateCount(count)), - })) - .await?; - }, - Err(_) => { - self.send(Err(RpcStatus::general("number of substates exceeds u32"))) - .await?; - return Err(()); - }, - } - for update in updates { + + if req.stream_qcs { self.send(Ok(SyncBlocksResponse { - sync_data: Some(SyncData::SubstateUpdate(update.into())), + sync_data: Some(SyncData::QuorumCertificates(QuorumCertificates { + quorum_certificates: qcs.iter().map(Into::into).collect(), + })), })) .await?; } + if req.stream_substates { + match u32::try_from(updates.len()) { + Ok(count) => { + self.send(Ok(SyncBlocksResponse { + sync_data: Some(SyncData::SubstateCount(count)), + })) + .await?; + }, + Err(_) => { + self.send(Err(RpcStatus::general("number of substates exceeds u32"))) + .await?; + return Err(()); + }, + } + for update in updates { + self.send(Ok(SyncBlocksResponse { + sync_data: Some(SyncData::SubstateUpdate(update.into())), + })) + .await?; + } + } - self.send(Ok(SyncBlocksResponse { - sync_data: Some(SyncData::Transactions(Transactions { - transactions: transactions.iter().map(|t| &t.transaction).map(Into::into).collect(), - })), - })) - .await?; + if req.stream_transactions { + match u32::try_from(transactions.len()) { + Ok(count) => { + self.send(Ok(SyncBlocksResponse { + sync_data: Some(SyncData::TransactionCount(count)), + })) + .await?; + }, + Err(_) => { + self.send(Err(RpcStatus::general("number of substates exceeds u32"))) + .await?; + return Err(()); + }, + } + for transaction in transactions { + self.send(Ok(SyncBlocksResponse { + sync_data: Some(SyncData::Transaction(transaction.transaction().into())), + })) + .await?; + } + } Ok(()) } diff --git a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs index ddd685a1b..3739d1a4e 100644 --- a/applications/tari_validator_node/src/p2p/rpc/service_impl.rs +++ b/applications/tari_validator_node/src/p2p/rpc/service_impl.rs @@ -207,7 +207,10 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { status: SubstateStatus::Up as i32, address: substate.substate_id().to_bytes(), version: substate.version(), - substate: substate.substate_value().to_bytes(), + substate: substate + .substate_value() + .map(|v| v.to_bytes()) + .ok_or_else(|| RpcStatus::general("NEVER HAPPEN: UP substate has no value"))?, created_transaction_hash: substate.created_by_transaction().into_array().to_vec(), destroyed_transaction_hash: vec![], quorum_certificates: vec![(&created_qc).into()], @@ -278,7 +281,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { .await .map_err(RpcStatus::log_internal_error(LOG_TARGET))?; - let start_block_id = Some(req.start_block_id) + let start_block_id = Some(req.start_block_id.as_slice()) .filter(|i| !i.is_empty()) .map(BlockId::try_from) .transpose() @@ -299,6 +302,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { None => { let epoch = req .epoch + .clone() .map(Epoch::from) .map(|end| end.min(current_epoch)) .unwrap_or(current_epoch); @@ -324,7 +328,7 @@ impl ValidatorNodeRpcService for ValidatorNodeRpcServiceImpl { }; let (sender, receiver) = mpsc::channel(10); - task::spawn(BlockSyncTask::new(self.shard_state_store.clone(), start_block_id, None, sender).run()); + task::spawn(BlockSyncTask::new(self.shard_state_store.clone(), start_block_id, None, sender).run(req)); Ok(Streaming::new(receiver)) } diff --git a/applications/tari_validator_node/src/state_bootstrap.rs b/applications/tari_validator_node/src/state_bootstrap.rs index fc6a4ef9d..5ea7e27b8 100644 --- a/applications/tari_validator_node/src/state_bootstrap.rs +++ b/applications/tari_validator_node/src/state_bootstrap.rs @@ -171,7 +171,7 @@ where SubstateRecord { version: id.version(), substate_id: id.into_substate_id(), - substate_value: value.into(), + substate_value: Some(value.into()), state_hash: Default::default(), created_by_transaction: Default::default(), created_justify: *genesis_block.justify().id(), diff --git a/applications/tari_validator_node/src/substate_resolver.rs b/applications/tari_validator_node/src/substate_resolver.rs index 8c26390b6..76104ba6d 100644 --- a/applications/tari_validator_node/src/substate_resolver.rs +++ b/applications/tari_validator_node/src/substate_resolver.rs @@ -40,9 +40,8 @@ where } fn resolve_local_substates(&self, transaction: &Transaction) -> Result { - let mut substates = IndexMap::new(); let inputs = transaction.all_inputs_substate_ids_iter(); - let (mut found_local_substates, missing_substate_ids) = self + let (found_local_substates, missing_substate_ids) = self .store .with_read_tx(|tx| SubstateRecord::get_any_max_version(tx, inputs))?; @@ -63,16 +62,23 @@ where Some(requested_version) => { let maybe_match = found_local_substates .iter() - .find(|s| s.version() == requested_version && s.substate_id() == requested_input.substate_id()); + .find(|s| s.substate_id() == requested_input.substate_id()); match maybe_match { Some(substate) => { - if substate.is_destroyed() { + if substate.version() < requested_version { + return Err(SubstateResolverError::InputSubstateDoesNotExist { + substate_requirement: requested_input, + }); + } + + if substate.is_destroyed() || substate.version() > requested_version { return Err(SubstateResolverError::InputSubstateDowned { id: requested_input.into_substate_id(), version: requested_version, }); } + // OK }, // Requested substate or version not found. We know that the requested substate is not foreign @@ -86,10 +92,9 @@ where }, // No version specified, so we will use the latest version None => { - let (pos, substate) = found_local_substates + let substate = found_local_substates .iter() - .enumerate() - .find(|(_, s)| s.substate_id() == requested_input.substate_id()) + .find(| s| s.substate_id() == requested_input.substate_id()) // This is not possible .ok_or_else(|| { error!( @@ -100,12 +105,12 @@ where SubstateResolverError::InputSubstateDoesNotExist { substate_requirement: requested_input.clone() } })?; + // Latest version is DOWN if substate.is_destroyed() { - // The requested substate is downed locally, it may be available in a foreign shard so we add it - // to missing - let _substate = found_local_substates.remove(pos); - missing_substates.insert(requested_input); - continue; + return Err(SubstateResolverError::InputSubstateDowned { + id: requested_input.into_substate_id(), + version: substate.version(), + }); } // User did not specify the version, so we will use the latest version @@ -121,11 +126,13 @@ where missing_substate_ids.len(), ); - substates.extend( - found_local_substates - .into_iter() - .map(|s| (s.substate_id.clone(), s.into_substate())), - ); + let mut substates = IndexMap::new(); + substates.extend(found_local_substates.into_iter().map(|s| { + ( + s.substate_id.clone(), + s.into_substate().expect("All substates already checked UP"), + ) + })); Ok(ResolvedSubstates { local: substates, diff --git a/applications/tari_validator_node_web_ui/package-lock.json b/applications/tari_validator_node_web_ui/package-lock.json index 1006d7c1a..9b7f08690 100644 --- a/applications/tari_validator_node_web_ui/package-lock.json +++ b/applications/tari_validator_node_web_ui/package-lock.json @@ -38,7 +38,7 @@ }, "../../bindings": { "name": "@tari-project/typescript-bindings", - "version": "1.3.1", + "version": "1.4.0", "license": "ISC", "devDependencies": { "shx": "^0.3.4", diff --git a/applications/tari_validator_node_web_ui/src/routes/Transactions/TransactionDetails.tsx b/applications/tari_validator_node_web_ui/src/routes/Transactions/TransactionDetails.tsx index 99fdd3700..82c834af6 100644 --- a/applications/tari_validator_node_web_ui/src/routes/Transactions/TransactionDetails.tsx +++ b/applications/tari_validator_node_web_ui/src/routes/Transactions/TransactionDetails.tsx @@ -72,29 +72,29 @@ export default function TransactionDetails() { setDownSubstate(downSubstates["substates"]); setEvents( upSubstates["substates"].reduce( - (acc: Event[], cur: SubstateRecord) => - "TransactionReceipt" in cur?.substate_value && cur?.substate_value?.TransactionReceipt?.events - ? acc.concat(cur?.substate_value?.TransactionReceipt?.events) + (acc: Event[], { substate_value }: SubstateRecord) => + substate_value && "TransactionReceipt" in substate_value && substate_value?.TransactionReceipt?.events + ? acc.concat(substate_value?.TransactionReceipt?.events) : acc, [], ), ); setLogs( upSubstates["substates"].reduce( - (acc: LogEntry[], cur: SubstateRecord) => - "TransactionReceipt" in cur?.substate_value && cur?.substate_value?.TransactionReceipt?.events - ? acc.concat(cur?.substate_value?.TransactionReceipt?.logs) + (acc: LogEntry[], { substate_value }: SubstateRecord) => + substate_value && "TransactionReceipt" in substate_value && substate_value?.TransactionReceipt?.events + ? acc.concat(substate_value?.TransactionReceipt?.logs) : acc, [], ), ); setFee( upSubstates["substates"].reduce( - (acc: number, cur: SubstateRecord) => + (acc: number, { substate_value }: SubstateRecord) => acc + Number( - ("TransactionReceipt" in cur?.substate_value && - cur?.substate_value?.TransactionReceipt?.fee_receipt?.total_fees_paid) || + (substate_value && "TransactionReceipt" in substate_value && + substate_value?.TransactionReceipt?.fee_receipt?.total_fees_paid) || 0, ), 0, diff --git a/bindings/dist/types/SubstateRecord.d.ts b/bindings/dist/types/SubstateRecord.d.ts index e006beb9a..c9fae3209 100644 --- a/bindings/dist/types/SubstateRecord.d.ts +++ b/bindings/dist/types/SubstateRecord.d.ts @@ -7,7 +7,7 @@ import type { SubstateValue } from "./SubstateValue"; export interface SubstateRecord { substate_id: SubstateId; version: number; - substate_value: SubstateValue; + substate_value: SubstateValue | null; state_hash: string; created_by_transaction: string; created_justify: string; diff --git a/bindings/src/types/SubstateRecord.ts b/bindings/src/types/SubstateRecord.ts index f812fee52..881274940 100644 --- a/bindings/src/types/SubstateRecord.ts +++ b/bindings/src/types/SubstateRecord.ts @@ -9,7 +9,7 @@ import type { SubstateValue } from "./SubstateValue"; export interface SubstateRecord { substate_id: SubstateId; version: number; - substate_value: SubstateValue; + substate_value: SubstateValue | null; state_hash: string; created_by_transaction: string; created_justify: string; diff --git a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs index 9362a95d9..3de0b5b06 100644 --- a/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs +++ b/dan_layer/consensus/src/hotstuff/on_ready_to_vote_on_local_block.rs @@ -65,6 +65,7 @@ use crate::{ LocalPreparedTransaction, PledgedTransaction, PreparedTransaction, + TransactionLockConflicts, }, HotstuffConfig, ProposalValidationError, @@ -2002,6 +2003,10 @@ where TConsensusSpec: ConsensusSpec .transaction_pool .remove_all(tx, block.all_finalising_transactions_ids())?; + // Whenever we commit a block that will result in an abort for a transaction, we can remove lock conflicts to + // allow other "blocked" transactions to be proposed. + TransactionLockConflicts::remove_for_transactions(tx, block.all_aborting_transaction_ids())?; + if !finalized_transactions.is_empty() { // Remove locks for finalized transactions SubstateRecord::unlock_all(tx, finalized_transactions.iter().map(|t| t.transaction_id()))?; diff --git a/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs b/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs index 7670c7e20..839553d6b 100644 --- a/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs +++ b/dan_layer/consensus/src/hotstuff/on_receive_new_transaction.rs @@ -4,7 +4,7 @@ use log::*; use tari_dan_common_types::{committee::CommitteeInfo, Epoch}; use tari_dan_storage::{ - consensus_models::{LockedSubstateValue, TransactionPool, TransactionRecord}, + consensus_models::{TransactionPool, TransactionRecord}, StateStore, }; use tari_engine_types::commit_result::RejectReason; @@ -134,11 +134,7 @@ where TConsensusSpec: ConsensusSpec rec.save(tx)?; - // Check if we're part of the input shard group and no input conflicts exist. If not, only sequence the - // transaction (is_ready=true, see foreign_proposal_processor) once we have received the LocalAccept - // foreign proposal. - let has_some_local_inputs_or_all_foreign_inputs = (rec.is_involved_in_inputs(local_committee_info) && - !self.has_possible_input_conflicts(&**tx, local_committee_info, &rec)?) || + let has_some_local_inputs_or_all_foreign_inputs = rec.is_involved_in_inputs(local_committee_info) || rec.has_all_foreign_input_pledges(&**tx, local_committee_info)?; if !has_some_local_inputs_or_all_foreign_inputs { @@ -152,32 +148,6 @@ where TConsensusSpec: ConsensusSpec Ok(Some((rec, has_some_local_inputs_or_all_foreign_inputs))) } - fn has_possible_input_conflicts( - &self, - tx: &::ReadTransaction<'_>, - local_committee_info: &CommitteeInfo, - transaction: &TransactionRecord, - ) -> Result { - let is_inputs_local_only = local_committee_info.includes_all_substate_addresses( - transaction - .transaction() - .all_inputs_iter() - .map(|i| i.to_substate_address_zero_version()), - ); - - let inputs = transaction.transaction().all_inputs_iter().collect::>(); - - let has_write_locks = LockedSubstateValue::get_transaction_id_that_has_any_write_locks_for_substates( - tx, - inputs.iter().map(|l| l.substate_id()), - // If this transaction is local only, we only care about non-local write locks - is_inputs_local_only, - )? - .is_some(); - - Ok(has_write_locks) - } - fn add_to_pool( &self, tx: &mut ::WriteTransaction<'_>, diff --git a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs index 70bcc6fd7..e00506f19 100644 --- a/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs +++ b/dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs @@ -91,7 +91,9 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> ReadableSubstateStore if substate.is_destroyed() { return Err(SubstateStoreError::SubstateIsDown { id: id.to_owned() }); } - Ok(substate.into_substate()) + Ok(substate + .into_substate() + .expect("PendingSubstateStore::get UP substate has no value")) } } @@ -227,7 +229,9 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> PendingSubstateStore<'store id: VersionedSubstateId::new(id.clone(), substate.version()), shard: substate.created_by_shard, transaction_id: substate.created_by_transaction, - substate: substate.into_substate(), + substate: substate + .into_substate() + .expect("PendingSubstateStore::get_latest_change: UP substate has no value"), }) } diff --git a/dan_layer/consensus/src/hotstuff/transaction_manager/lock_deps.rs b/dan_layer/consensus/src/hotstuff/transaction_manager/lock_deps.rs index 8dc8800dc..d59c71ef9 100644 --- a/dan_layer/consensus/src/hotstuff/transaction_manager/lock_deps.rs +++ b/dan_layer/consensus/src/hotstuff/transaction_manager/lock_deps.rs @@ -37,4 +37,15 @@ impl TransactionLockConflicts { tx.lock_conflicts_insert_all(block_id, &self.conflicts) } + + pub(crate) fn remove_for_transactions< + 'a, + TTx: StateStoreWriteTransaction, + I: IntoIterator, + >( + tx: &mut TTx, + transaction_ids: I, + ) -> Result<(), StorageError> { + tx.lock_conflicts_remove_by_transaction_ids(transaction_ids) + } } diff --git a/dan_layer/consensus/src/traits/substate_store.rs b/dan_layer/consensus/src/traits/substate_store.rs index 53c66f527..8215d54c1 100644 --- a/dan_layer/consensus/src/traits/substate_store.rs +++ b/dan_layer/consensus/src/traits/substate_store.rs @@ -1,16 +1,11 @@ // Copyright 2024 The Tari Project // SPDX-License-Identifier: BSD-3-Clause -use tari_dan_common_types::{ToSubstateAddress, VersionedSubstateIdRef}; -use tari_dan_storage::{ - consensus_models::{SubstateChange, SubstateRecord}, - StateStoreReadTransaction, -}; +use tari_dan_common_types::VersionedSubstateIdRef; +use tari_dan_storage::consensus_models::SubstateChange; use tari_engine_types::substate::{Substate, SubstateDiff}; use tari_transaction::TransactionId; -use crate::hotstuff::substate_store::SubstateStoreError; - pub trait ReadableSubstateStore { type Error; @@ -26,12 +21,3 @@ pub trait WriteableSubstateStore: ReadableSubstateStore { pub trait SubstateStore: ReadableSubstateStore + WriteableSubstateStore {} impl SubstateStore for T {} - -impl ReadableSubstateStore for &T { - type Error = SubstateStoreError; - - fn get(&self, id: VersionedSubstateIdRef<'_>) -> Result { - let substate = SubstateRecord::get(*self, &id.to_substate_address())?; - Ok(substate.into_substate()) - } -} diff --git a/dan_layer/consensus_tests/src/consensus.rs b/dan_layer/consensus_tests/src/consensus.rs index a6c8efea8..191879f31 100644 --- a/dan_layer/consensus_tests/src/consensus.rs +++ b/dan_layer/consensus_tests/src/consensus.rs @@ -1569,6 +1569,7 @@ async fn multishard_publish_template() { .unwrap(); let binary = template_substate .substate_value + .unwrap() .into_template() .expect("Expected template substate") .binary; diff --git a/dan_layer/consensus_tests/src/substate_store.rs b/dan_layer/consensus_tests/src/substate_store.rs index 29bf60b3e..280d4f0d2 100644 --- a/dan_layer/consensus_tests/src/substate_store.rs +++ b/dan_layer/consensus_tests/src/substate_store.rs @@ -210,7 +210,7 @@ fn add_substate(store: &TestStore, seed: u8, version: u32) -> VersionedSubstateI SubstateRecord { substate_id: id.clone(), version, - substate_value: value, + substate_value: Some(value), state_hash: [seed; 32].into(), created_by_transaction: Default::default(), created_justify: QcId::zero(), diff --git a/dan_layer/p2p/proto/rpc.proto b/dan_layer/p2p/proto/rpc.proto index 02da0294b..dc32bba8d 100644 --- a/dan_layer/p2p/proto/rpc.proto +++ b/dan_layer/p2p/proto/rpc.proto @@ -159,8 +159,6 @@ message SubstateData { bytes created_transaction = 7; } - - message SubstateUpdate { oneof update { SubstateCreatedProof create = 1; @@ -186,6 +184,12 @@ message SyncBlocksRequest { // Optional - If start_block_id is provided, this is ignored. Must be provided if start_block_id is not provided. // In which case, start block is implicitly the first block of the epoch. tari.dan.common.Epoch epoch = 2; + // If true, QCs will be streamed. + bool stream_qcs = 3; + // If true, substates will be streamed. + bool stream_substates = 4; + // If true, transactions in the block will be streamed + bool stream_transactions = 5; } message SyncBlocksResponse { @@ -194,7 +198,8 @@ message SyncBlocksResponse { QuorumCertificates quorum_certificates = 2; uint32 substate_count = 3; SubstateUpdate substate_update = 4; - Transactions transactions = 5; + uint32 transaction_count = 5; + tari.dan.transaction.Transaction transaction = 6; } } @@ -202,10 +207,6 @@ message QuorumCertificates { repeated tari.dan.consensus.QuorumCertificate quorum_certificates = 1; } -message Transactions { - repeated tari.dan.transaction.Transaction transactions = 1; -} - message GetHighQcRequest {} message GetHighQcResponse { diff --git a/dan_layer/p2p/src/block_sync.rs b/dan_layer/p2p/src/block_sync.rs index 431f7eebc..7f8b27d4c 100644 --- a/dan_layer/p2p/src/block_sync.rs +++ b/dan_layer/p2p/src/block_sync.rs @@ -5,7 +5,7 @@ use crate::{ proto, proto::{ consensus::{Block, QuorumCertificate}, - rpc::{sync_blocks_response::SyncData, QuorumCertificates, SubstateUpdate, Transactions}, + rpc::{sync_blocks_response::SyncData, QuorumCertificates, SubstateUpdate}, transaction::Transaction, }, }; @@ -32,6 +32,13 @@ impl proto::rpc::SyncBlocksResponse { } } + pub fn transaction_count(&self) -> Option { + match self.sync_data { + Some(SyncData::TransactionCount(count)) => Some(count), + _ => None, + } + } + pub fn into_substate_update(self) -> Option { match self.sync_data { Some(SyncData::SubstateUpdate(update)) => Some(update), @@ -39,9 +46,9 @@ impl proto::rpc::SyncBlocksResponse { } } - pub fn into_transactions(self) -> Option> { + pub fn into_transaction(self) -> Option { match self.sync_data { - Some(SyncData::Transactions(Transactions { transactions })) => Some(transactions), + Some(SyncData::Transaction(transaction)) => Some(transaction), _ => None, } } diff --git a/dan_layer/p2p/src/conversions/consensus.rs b/dan_layer/p2p/src/conversions/consensus.rs index f7961f08b..ac1475951 100644 --- a/dan_layer/p2p/src/conversions/consensus.rs +++ b/dan_layer/p2p/src/conversions/consensus.rs @@ -909,7 +909,10 @@ impl TryFrom for SubstateRecord { Ok(Self { substate_id: SubstateId::from_bytes(&value.substate_id)?, version: value.version, - substate_value: SubstateValue::from_bytes(&value.substate)?, + substate_value: Some(value.substate.as_slice()) + .filter(|d| !d.is_empty()) + .map(SubstateValue::from_bytes) + .transpose()?, // TODO: Should we add this to the proto? state_hash: Default::default(), @@ -930,7 +933,7 @@ impl From for proto::consensus::Substate { Self { substate_id: value.substate_id.to_bytes(), version: value.version, - substate: value.substate_value.to_bytes(), + substate: value.substate_value.as_ref().map(|s| s.to_bytes()).unwrap_or_default(), created_transaction: value.created_by_transaction.as_bytes().to_vec(), created_justify: value.created_justify.as_bytes().to_vec(), diff --git a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql index 1f9625ea4..8a0a81984 100644 --- a/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql +++ b/dan_layer/state_store_sqlite/migrations/2023-06-08-091819_create_state_store/up.sql @@ -148,7 +148,7 @@ create table substates address text not NULL, substate_id text not NULL, version integer not NULL, - data text not NULL, + data text NULL, state_hash text not NULL, created_by_transaction text not NULL, created_justify text not NULL, diff --git a/dan_layer/state_store_sqlite/src/reader.rs b/dan_layer/state_store_sqlite/src/reader.rs index fad196402..c89af71fb 100644 --- a/dan_layer/state_store_sqlite/src/reader.rs +++ b/dan_layer/state_store_sqlite/src/reader.rs @@ -1509,6 +1509,9 @@ impl<'tx, TAddr: NodeAddressable + Serialize + DeserializeOwned + 'tx> StateStor use crate::schema::quorum_certificates; let qc_ids: Vec = qc_ids.into_iter().map(serialize_hex).collect(); + if qc_ids.is_empty() { + return Ok(vec![]); + } let qc_json = quorum_certificates::table .select(quorum_certificates::json) diff --git a/dan_layer/state_store_sqlite/src/schema.rs b/dan_layer/state_store_sqlite/src/schema.rs index bd7a09ccc..87e1b03ee 100644 --- a/dan_layer/state_store_sqlite/src/schema.rs +++ b/dan_layer/state_store_sqlite/src/schema.rs @@ -406,7 +406,7 @@ diesel::table! { address -> Text, substate_id -> Text, version -> Integer, - data -> Text, + data -> Nullable, state_hash -> Text, created_by_transaction -> Text, created_justify -> Text, diff --git a/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs b/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs index 3e3d71332..39e28c79d 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/state_transition.rs @@ -43,9 +43,14 @@ impl StateTransition { let update = match self.transition.as_str() { "UP" => SubstateUpdate::Create(SubstateCreatedProof { substate: SubstateData { + substate_value: substate.substate_value.ok_or_else(|| StorageError::DataInconsistency { + details: format!( + "StateTransition::try_convert: Up substate {} does not have a value", + self.substate_id + ), + })?, substate_id: substate.substate_id, version: substate.version, - substate_value: substate.substate_value, created_by_transaction: substate.created_by_transaction, }, }), diff --git a/dan_layer/state_store_sqlite/src/sql_models/substate.rs b/dan_layer/state_store_sqlite/src/sql_models/substate.rs index 96fa95324..c6c9af628 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/substate.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/substate.rs @@ -14,7 +14,7 @@ pub struct SubstateRecord { pub address: String, pub substate_id: String, pub version: i32, - pub data: String, + pub data: Option, pub state_hash: String, pub created_by_transaction: String, pub created_justify: String, @@ -67,7 +67,7 @@ impl TryFrom for consensus_models::SubstateRecord { Ok(Self { substate_id: parse_from_string(&value.substate_id)?, version: value.version as u32, - substate_value: deserialize_json(&value.data)?, + substate_value: value.data.as_deref().map(deserialize_json).transpose()?, state_hash: deserialize_hex_try_from(&value.state_hash)?, created_by_transaction: deserialize_hex_try_from(&value.created_by_transaction)?, created_justify: deserialize_hex_try_from(&value.created_justify)?, diff --git a/dan_layer/state_store_sqlite/src/sql_models/substate_lock.rs b/dan_layer/state_store_sqlite/src/sql_models/substate_lock.rs index 50f50daa2..4b8910534 100644 --- a/dan_layer/state_store_sqlite/src/sql_models/substate_lock.rs +++ b/dan_layer/state_store_sqlite/src/sql_models/substate_lock.rs @@ -59,7 +59,7 @@ impl SubstateLock { locked_by_block: deserialize_hex_try_from(&self.block_id)?, substate_id: id, lock: self.try_into_substate_lock()?, - value: substate_rec.map(|r| r.into_substate_value()), + value: substate_rec.and_then(|r| r.into_substate_value()), }) } } diff --git a/dan_layer/state_store_sqlite/src/writer.rs b/dan_layer/state_store_sqlite/src/writer.rs index b83e0437f..e729917d3 100644 --- a/dan_layer/state_store_sqlite/src/writer.rs +++ b/dan_layer/state_store_sqlite/src/writer.rs @@ -1675,7 +1675,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta substates::address.eq(serialize_hex(substate.to_substate_address())), substates::substate_id.eq(substate.substate_id.to_string()), substates::version.eq(substate.version as i32), - substates::data.eq(serialize_json(&substate.substate_value)?), + substates::data.eq(substate.substate_value.as_ref().map(serialize_json).transpose()?), substates::state_hash.eq(serialize_hex(substate.state_hash)), substates::created_by_transaction.eq(serialize_hex(substate.created_by_transaction)), substates::created_justify.eq(serialize_hex(substate.created_justify)), @@ -1746,6 +1746,7 @@ impl<'tx, TAddr: NodeAddressable + 'tx> StateStoreWriteTransaction for SqliteSta substates::destroyed_at_epoch.eq(Some(epoch.as_u64() as i64)), substates::destroyed_by_shard.eq(Some(shard.as_u32() as i32)), substates::destroyed_justify.eq(Some(serialize_hex(destroyed_qc_id))), + substates::data.eq(None::), ); let address = versioned_substate_id.to_substate_address(); diff --git a/dan_layer/storage/src/consensus_models/block.rs b/dan_layer/storage/src/consensus_models/block.rs index 171877665..8662f2b3c 100644 --- a/dan_layer/storage/src/consensus_models/block.rs +++ b/dan_layer/storage/src/consensus_models/block.rs @@ -306,6 +306,10 @@ impl Block { self.commands.iter().filter_map(|d| d.finalising()).map(|t| t.id()) } + pub fn all_aborting_transaction_ids(&self) -> impl Iterator + '_ { + self.commands.iter().filter_map(|d| d.aborting()).map(|t| t.id()) + } + pub fn all_foreign_proposals(&self) -> impl Iterator + '_ { self.commands.iter().filter_map(|c| c.foreign_proposal()) } @@ -836,23 +840,25 @@ impl Block { // 2. The substate was created by this transaction and destroyed in a later transaction // It isn't possible for a substate to be created and destroyed by the same transaction // because the engine can never emit such a substate diff. - if substate.created_by_transaction == transaction.id { - updates.push(SubstateUpdate::Create(SubstateCreatedProof { - // created_qc: substate.get_created_quorum_certificate(tx)?, - substate: substate.into(), - })); - } else { - updates.push(SubstateUpdate::Destroy(SubstateDestroyedProof { - substate_id: substate.substate_id.clone(), - version: substate.version, - // justify: QuorumCertificate::get(tx, &destroyed.justify)?, - destroyed_by_transaction: destroyed.by_transaction, - })); - } + // TODO: This is currently not used - if we need this in future, we can include the state hash en + // lieu of the actual state which does not exist + // if substate.created_by_transaction == transaction.id + // { updates.push(SubstateUpdate::Create(SubstateCreatedProof { + // // created_qc: substate.get_created_quorum_certificate(tx)?, + // substate: substate.try_into()?, + // })); + // } else { + updates.push(SubstateUpdate::Destroy(SubstateDestroyedProof { + substate_id: substate.substate_id.clone(), + version: substate.version, + // justify: QuorumCertificate::get(tx, &destroyed.justify)?, + destroyed_by_transaction: destroyed.by_transaction, + })); + // } } else { updates.push(SubstateUpdate::Create(SubstateCreatedProof { // created_qc: substate.get_created_quorum_certificate(tx)?, - substate: substate.into(), + substate: substate.try_into()?, })); }; } diff --git a/dan_layer/storage/src/consensus_models/command.rs b/dan_layer/storage/src/consensus_models/command.rs index cc5bdeb5b..05fa634c1 100644 --- a/dan_layer/storage/src/consensus_models/command.rs +++ b/dan_layer/storage/src/consensus_models/command.rs @@ -255,6 +255,15 @@ impl Command { .filter(|t| t.decision.is_commit()) } + /// Returns Some if the command **will** result in aborting the transaction, otherwise None. + pub fn aborting(&self) -> Option<&TransactionAtom> { + self.some_accept() + .or_else(|| self.local_prepare()) + .or_else(|| self.local_accept()) + .or_else(|| self.local_only()) + .filter(|t| t.decision.is_abort()) + } + pub fn is_epoch_end(&self) -> bool { matches!(self, Command::EndEpoch) } diff --git a/dan_layer/storage/src/consensus_models/substate.rs b/dan_layer/storage/src/consensus_models/substate.rs index 5fb9d8575..0fe163cf3 100644 --- a/dan_layer/storage/src/consensus_models/substate.rs +++ b/dan_layer/storage/src/consensus_models/substate.rs @@ -32,7 +32,7 @@ use crate::{ pub struct SubstateRecord { pub substate_id: SubstateId, pub version: u32, - pub substate_value: SubstateValue, + pub substate_value: Option, #[cfg_attr(feature = "ts", ts(type = "string"))] pub state_hash: FixedHash, #[cfg_attr(feature = "ts", ts(type = "string"))] @@ -80,7 +80,7 @@ impl SubstateRecord { substate_id, version, state_hash: hash_substate(&substate_value, version), - substate_value, + substate_value: Some(substate_value), created_height, created_justify, created_by_shard, @@ -107,20 +107,16 @@ impl SubstateRecord { &self.substate_id } - pub fn substate_value(&self) -> &SubstateValue { - &self.substate_value + pub fn substate_value(&self) -> Option<&SubstateValue> { + self.substate_value.as_ref() } - pub fn into_substate_value(self) -> SubstateValue { + pub fn into_substate_value(self) -> Option { self.substate_value } - pub fn to_substate(&self) -> Substate { - Substate::new(self.version, self.substate_value.clone()) - } - - pub fn into_substate(self) -> Substate { - Substate::new(self.version, self.substate_value) + pub fn into_substate(self) -> Option { + Some(Substate::new(self.version, self.substate_value?)) } pub fn version(&self) -> u32 { @@ -368,14 +364,21 @@ impl SubstateData { } } -impl From for SubstateData { - fn from(value: SubstateRecord) -> Self { - Self { +impl TryFrom for SubstateData { + type Error = StorageError; + + fn try_from(value: SubstateRecord) -> Result { + Ok(Self { + substate_value: value.substate_value.ok_or_else(|| StorageError::DataInconsistency { + details: format!( + "Cannot convert substate record {} with null value to SubstateData", + value.substate_id + ), + })?, substate_id: value.substate_id, version: value.version, - substate_value: value.substate_value, created_by_transaction: value.created_by_transaction, - } + }) } } diff --git a/dan_layer/storage/src/consensus_models/substate_change.rs b/dan_layer/storage/src/consensus_models/substate_change.rs index f07727993..2e35e18ca 100644 --- a/dan_layer/storage/src/consensus_models/substate_change.rs +++ b/dan_layer/storage/src/consensus_models/substate_change.rs @@ -8,8 +8,6 @@ use tari_engine_types::substate::Substate; use tari_state_tree::SubstateTreeChange; use tari_transaction::TransactionId; -use crate::consensus_models::SubstateRecord; - #[derive(Debug, Clone)] pub enum SubstateChange { Up { @@ -98,25 +96,6 @@ impl SubstateChange { } } -impl From for SubstateChange { - fn from(value: SubstateRecord) -> Self { - if let Some(destroyed) = value.destroyed() { - Self::Down { - id: value.to_versioned_substate_id(), - shard: destroyed.by_shard, - transaction_id: destroyed.by_transaction, - } - } else { - Self::Up { - id: value.to_versioned_substate_id(), - shard: value.created_by_shard, - transaction_id: value.created_by_transaction, - substate: value.into_substate(), - } - } - } -} - impl Display for SubstateChange { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self {