Skip to content

Commit

Permalink
feat(consensus): skip to local accept phase for aborted transactions (#…
Browse files Browse the repository at this point in the history
…1273)

Description
---
feat(consensus): skip to local accept phase for aborted transactions
Missing consensus-level  transaction validators 
Additional atom checks in LocalAccept

Motivation and Context
---
Since aborted transactions do not lock or pledge inputs, they can
proceed directly to Accept phase
This removes the need for 2 cross-shard exchanges.

How Has This Been Tested?
---
Still testing

What process can a PR reviewer use to test or verify this change?
---


Breaking Changes
---

- [x] None
- [ ] Requires data directory to be deleted
- [ ] Other - Please specify
  • Loading branch information
sdbondi authored Jan 30, 2025
1 parent d01ce85 commit ec0eb2e
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 55 deletions.
22 changes: 18 additions & 4 deletions applications/tari_validator_node/src/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ use tokio::{
#[cfg(feature = "metrics")]
use crate::consensus::metrics::PrometheusConsensusMetrics;
use crate::{
consensus::{self, ConsensusHandle, TariDanBlockTransactionExecutor},
consensus::{self, ConsensusHandle, TariDanBlockTransactionExecutor, ValidationContext},
dry_run_transaction_processor::DryRunTransactionProcessor,
file_l1_submitter::FileLayerOneSubmitter,
p2p::{
Expand All @@ -86,11 +86,14 @@ use crate::{
state_bootstrap::bootstrap_state,
substate_resolver::TariSubstateResolver,
transaction_validators::{
EpochRangeValidator,
FeeTransactionValidator,
HasInputs,
TemplateExistsValidator,
TransactionNetworkValidator,
TransactionSignatureValidator,
TransactionValidationError,
WithContext,
},
validator::Validator,
validator_registration_file::ValidatorRegistrationFile,
Expand Down Expand Up @@ -281,7 +284,7 @@ pub async fn spawn_services(
);
let transaction_executor = TariDanBlockTransactionExecutor::new(
payload_processor.clone(),
consensus::create_transaction_validator(template_manager.clone()).boxed(),
create_consensus_transaction_validator(config.network, template_manager.clone()).boxed(),
);

#[cfg(feature = "metrics")]
Expand Down Expand Up @@ -489,12 +492,23 @@ async fn spawn_p2p_rpc(
Ok(())
}

fn create_mempool_transaction_validator(
pub fn create_mempool_transaction_validator(
network: Network,
template_manager: TemplateManager<PeerAddress>,
) -> impl Validator<Transaction, Context = (), Error = TransactionValidationError> {
TransactionNetworkValidator::new(network)
.and_then(HasInputs::new())
.and_then(TemplateExistsValidator::new(template_manager))
.and_then(FeeTransactionValidator)
.and_then(TransactionSignatureValidator)
.and_then(HasInputs::new())
.and_then(TemplateExistsValidator::new(template_manager))
}

pub fn create_consensus_transaction_validator(
network: Network,
template_manager: TemplateManager<PeerAddress>,
) -> impl Validator<Transaction, Context = ValidationContext, Error = TransactionValidationError> {
WithContext::<ValidationContext, _, _>::new()
.map_context(|_| (), create_mempool_transaction_validator(network, template_manager))
.map_context(|c| c.current_epoch, EpochRangeValidator::new())
}
27 changes: 3 additions & 24 deletions applications/tari_validator_node/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,8 @@ use crate::{
consensus::{leader_selection::RoundRobinLeaderStrategy, spec::TariConsensusSpec},
event_subscription::EventSubscription,
p2p::services::messaging::{ConsensusInboundMessaging, ConsensusOutboundMessaging},
transaction_validators::{
EpochRangeValidator,
FeeTransactionValidator,
HasInputs,
TemplateExistsValidator,
TransactionSignatureValidator,
TransactionValidationError,
},
validator::{BoxedValidator, Validator},
transaction_validators::TransactionValidationError,
validator::BoxedValidator,
};

mod block_transaction_executor;
Expand All @@ -53,7 +46,7 @@ pub use signature_service::*;
use tari_consensus::{consensus_constants::ConsensusConstants, hotstuff::HotstuffEvent};
use tari_dan_app_utilities::template_manager::interface::TemplateManagerHandle;

use crate::{p2p::NopLogger, transaction_validators::WithContext};
use crate::p2p::NopLogger;

pub type ConsensusTransactionValidator = BoxedValidator<ValidationContext, Transaction, TransactionValidationError>;

Expand Down Expand Up @@ -132,17 +125,3 @@ pub async fn spawn(

(join_handle, consensus_handle)
}

pub fn create_transaction_validator(
template_manager: TemplateManager<PeerAddress>,
) -> impl Validator<Transaction, Context = ValidationContext, Error = TransactionValidationError> {
WithContext::<ValidationContext, _, _>::new()
.map_context(
|_| (),
HasInputs::new()
.and_then(TransactionSignatureValidator)
.and_then(TemplateExistsValidator::new(template_manager)),
)
.map_context(|c| c.current_epoch, EpochRangeValidator::new())
.map_context(|_| (), FeeTransactionValidator)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,13 @@

import { useEffect, useState } from "react";
import { useParams } from "react-router-dom";
// import { transactionsGet } from '../../utils/json_rpc';
import { Accordion, AccordionDetails, AccordionSummary } from "../../Components/Accordion";
import { Alert, Button, Fade, Grid, Table, TableBody, TableCell, TableContainer, TableRow } from "@mui/material";
import Typography from "@mui/material/Typography";
import { DataTableCell, StyledPaper } from "../../Components/StyledComponents";
import PageHeading from "../../Components/PageHeading";
import Events from "./Events";
import Logs from "./Logs";
import FeeInstructions from "./FeeInstructions";
import Instructions from "./Instructions";
import Substates from "./Substates";
import KeyboardArrowDownIcon from "@mui/icons-material/KeyboardArrowDown";
Expand Down
43 changes: 43 additions & 0 deletions dan_layer/common_types/src/versioned_substate_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,49 @@ impl Borrow<SubstateId> for SubstateRequirement {
}
}

#[derive(Debug, Clone, Copy)]
pub struct SubstateRequirementRef<'a> {
pub substate_id: &'a SubstateId,
pub version: Option<u32>,
}

impl<'a> SubstateRequirementRef<'a> {
pub fn new(substate_id: &'a SubstateId, version: Option<u32>) -> Self {
Self { substate_id, version }
}

pub fn to_owned(&self) -> SubstateRequirement {
SubstateRequirement::new(self.substate_id.clone(), self.version)
}
}

impl<'a> From<&'a VersionedSubstateId> for SubstateRequirementRef<'a> {
fn from(value: &'a VersionedSubstateId) -> Self {
Self {
substate_id: &value.substate_id,
version: Some(value.version),
}
}
}

impl<'a> From<&'a SubstateRequirement> for SubstateRequirementRef<'a> {
fn from(value: &'a SubstateRequirement) -> Self {
Self {
substate_id: &value.substate_id,
version: value.version,
}
}
}

impl<'a> From<VersionedSubstateIdRef<'a>> for SubstateRequirementRef<'a> {
fn from(value: VersionedSubstateIdRef<'a>) -> Self {
Self {
substate_id: value.substate_id,
version: Some(value.version),
}
}
}

#[derive(Debug, thiserror::Error)]
#[error("Failed to parse substate requirement {0}")]
pub struct SubstateRequirementParseError(String);
Expand Down
4 changes: 4 additions & 0 deletions dan_layer/consensus/src/hotstuff/on_propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,10 @@ where TConsensusSpec: ConsensusSpec
),
// Leader thinks all local nodes have prepared
TransactionPoolStage::Prepared => {
if tx_rec.current_decision().is_abort() {
let atom = tx_rec.get_current_transaction_atom();
return Ok(Some(Command::LocalAccept(atom)));
}
if tx_rec
.evidence()
.is_committee_output_only(local_committee_info.shard_group())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1378,15 +1378,16 @@ where TConsensusSpec: ConsensusSpec
return Ok(Some(NoVoteReason::TransactionNotInPool));
};

#[allow(clippy::nonminimal_bool)]
if !tx_rec.current_stage().is_all_prepared() &&
!tx_rec.current_stage().is_some_prepared() &&
// We allow the transition from LocalPrepared to LocalAccepted if we are output-only
!(tx_rec.current_stage().is_prepared() &&
let is_applicable_stage = tx_rec.current_stage().is_all_prepared() ||
tx_rec.current_stage().is_some_prepared() ||
// We allow the transition from Prepared to LocalAccepted if ABORT, or we are output-only
(tx_rec.current_stage().is_prepared() &&
(tx_rec.current_decision().is_abort() ||
tx_rec
.evidence()
.is_committee_output_only(local_committee_info.shard_group()))
{
.is_committee_output_only(local_committee_info.shard_group())));

if !is_applicable_stage {
warn!(
target: LOG_TARGET,
"{} ❌ Stage disagreement in block {} for transaction {}. Leader proposed LocalAccept, but current stage is {}",
Expand Down Expand Up @@ -1435,7 +1436,7 @@ where TConsensusSpec: ConsensusSpec
let Some(ref leader_fee) = atom.leader_fee else {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee in tx {} not set for AllAccept command in block {}",
"❌ NO VOTE: Leader fee in tx {} not set for LocalAccept command in block {}",
atom.id,
block,
);
Expand All @@ -1462,6 +1463,16 @@ where TConsensusSpec: ConsensusSpec
}

tx_rec.set_leader_fee(calculated_leader_fee);
} else if atom.leader_fee.is_some() {
warn!(
target: LOG_TARGET,
"❌ NO VOTE: Leader fee in tx {} is set for LocalAccept ABORT command in block {}",
atom.id,
block,
);
return Ok(Some(NoVoteReason::LeaderFeeDisagreement));
} else {
// Ok
}

// on_propose does not process foreign proposals, so the QC evidence may not match the evidence here.
Expand Down
32 changes: 20 additions & 12 deletions dan_layer/consensus/src/hotstuff/substate_store/pending_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,18 +144,6 @@ impl<'a, 'tx, TStore: StateStore + 'a + 'tx> WriteableSubstateStore for PendingS
}

impl<'store, 'tx, TStore: StateStore + 'store + 'tx> PendingSubstateStore<'store, 'tx, TStore> {
pub fn exists(&self, id: &VersionedSubstateId) -> Result<bool, SubstateStoreError> {
if self.pending.contains_key(&id.to_substate_address()) {
return Ok(true);
}

if SubstateRecord::exists(self.read_transaction(), id)? {
return Ok(true);
}

Ok(false)
}

pub fn get_latest_version(&self, id: &SubstateId) -> Result<LatestSubstateVersion, SubstateStoreError> {
if let Some(ch) = self.head.get(id).map(|&pos| &self.diff[pos]) {
// if ch.is_down() {
Expand Down Expand Up @@ -662,6 +650,15 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> PendingSubstateStore<'store
return Ok(());
}

if let Some(change) =
BlockDiff::get_for_versioned_substate(self.read_transaction(), &self.parent_block, id).optional()?
{
if change.is_up() {
return Err(SubstateStoreError::ExpectedSubstateDown { id: id.clone() });
}
return Ok(());
}

let address = id.to_substate_address();
let Some(is_up) = SubstateRecord::substate_is_up(self.read_transaction(), &address).optional()? else {
debug!(target: LOG_TARGET, "Expected substate {} to be DOWN but it does not exist", address);
Expand All @@ -684,6 +681,17 @@ impl<'store, 'tx, TStore: StateStore + 'store + 'tx> PendingSubstateStore<'store
return Ok(());
}

if let Some(change) =
BlockDiff::get_for_versioned_substate(self.read_transaction(), &self.parent_block, id).optional()?
{
if change.is_up() {
return Err(SubstateStoreError::LockFailed(LockFailedError::SubstateIsUp {
id: id.clone(),
}));
}
return Ok(());
}

if SubstateRecord::exists(self.read_transaction(), id)? {
return Err(SubstateStoreError::LockFailed(LockFailedError::SubstateIsUp {
id: id.clone(),
Expand Down
6 changes: 6 additions & 0 deletions dan_layer/storage/src/consensus_models/evidence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,12 @@ impl Evidence {
.all(|e| e.is_prepare_justified() || e.is_accept_justified())
}

pub fn some_shard_groups_prepared(&self) -> bool {
self.evidence
.values()
.any(|e| e.is_prepare_justified() || e.is_accept_justified())
}

pub fn all_input_shard_groups_prepared(&self) -> bool {
self.evidence
.values()
Expand Down
13 changes: 8 additions & 5 deletions dan_layer/storage/src/consensus_models/transaction_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,8 +430,8 @@ impl TransactionPoolRecord {
},
TransactionPoolStage::LocalAccepted => match self.current_decision() {
Decision::Commit => self.evidence.all_shard_groups_accepted(),
// If we have decided to abort, we can continue if all inputs are justified
Decision::Abort(_) => self.evidence.all_shard_groups_prepared(),
// If we have decided to abort, we can continue if any foreign shard or locally has prepared
Decision::Abort(_) => self.evidence.some_shard_groups_prepared(),
},
TransactionPoolStage::AllAccepted |
TransactionPoolStage::SomeAccepted |
Expand Down Expand Up @@ -491,6 +491,9 @@ impl TransactionPoolRecord {
}

pub fn leader_fee(&self) -> Option<&LeaderFee> {
if self.current_decision().is_abort() {
return None;
}
self.leader_fee.as_ref()
}

Expand All @@ -508,7 +511,7 @@ impl TransactionPoolRecord {
decision: self.current_decision(),
evidence: self.evidence.clone(),
transaction_fee: self.transaction_fee,
leader_fee: self.leader_fee.clone(),
leader_fee: self.leader_fee().cloned(),
}
}

Expand All @@ -518,17 +521,17 @@ impl TransactionPoolRecord {
decision: self.current_local_decision(),
evidence: self.evidence.clone(),
transaction_fee: self.transaction_fee,
leader_fee: self.leader_fee.clone(),
leader_fee: self.leader_fee().cloned(),
}
}

pub fn into_current_transaction_atom(self) -> TransactionAtom {
TransactionAtom {
id: self.transaction_id,
decision: self.current_decision(),
leader_fee: self.leader_fee().cloned(),
evidence: self.evidence,
transaction_fee: self.transaction_fee,
leader_fee: self.leader_fee,
}
}

Expand Down

0 comments on commit ec0eb2e

Please sign in to comment.