diff --git a/src/filters/mod.rs b/src/filters/mod.rs index aa2f867f..87e78d58 100644 --- a/src/filters/mod.rs +++ b/src/filters/mod.rs @@ -9,6 +9,7 @@ pub mod noop; pub mod parse_cbor; pub mod select; pub mod split_block; +pub mod split_tx; #[cfg(feature = "wasm")] pub mod wasm_plugin; @@ -16,6 +17,7 @@ pub mod wasm_plugin; pub enum Bootstrapper { Noop(noop::Stage), SplitBlock(split_block::Stage), + SplitTx(split_tx::Stage), IntoJson(into_json::Stage), LegacyV1(legacy_v1::Stage), ParseCbor(parse_cbor::Stage), @@ -30,6 +32,7 @@ impl Bootstrapper { match self { Bootstrapper::Noop(p) => &mut p.input, Bootstrapper::SplitBlock(p) => &mut p.input, + Bootstrapper::SplitTx(p) => &mut p.input, Bootstrapper::IntoJson(p) => &mut p.input, Bootstrapper::LegacyV1(p) => &mut p.input, Bootstrapper::ParseCbor(p) => &mut p.input, @@ -44,6 +47,7 @@ impl Bootstrapper { match self { Bootstrapper::Noop(p) => &mut p.output, Bootstrapper::SplitBlock(p) => &mut p.output, + Bootstrapper::SplitTx(p) => &mut p.output, Bootstrapper::IntoJson(p) => &mut p.output, Bootstrapper::LegacyV1(p) => &mut p.output, Bootstrapper::ParseCbor(p) => &mut p.output, @@ -58,6 +62,7 @@ impl Bootstrapper { match self { Bootstrapper::Noop(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::SplitBlock(x) => gasket::runtime::spawn_stage(x, policy), + Bootstrapper::SplitTx(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::IntoJson(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy), Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy), @@ -74,6 +79,7 @@ impl Bootstrapper { pub enum Config { Noop(noop::Config), SplitBlock(split_block::Config), + SplitTx(split_tx::Config), IntoJson(into_json::Config), LegacyV1(legacy_v1::Config), ParseCbor(parse_cbor::Config), @@ -88,6 +94,7 @@ impl Config { match self { Config::Noop(c) => Ok(Bootstrapper::Noop(c.bootstrapper(ctx)?)), Config::SplitBlock(c) => Ok(Bootstrapper::SplitBlock(c.bootstrapper(ctx)?)), + Config::SplitTx(c) => Ok(Bootstrapper::SplitTx(c.bootstrapper(ctx)?)), Config::IntoJson(c) => Ok(Bootstrapper::IntoJson(c.bootstrapper(ctx)?)), Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)), Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)), diff --git a/src/filters/parse_cbor.rs b/src/filters/parse_cbor.rs index d085781d..0e833fe2 100644 --- a/src/filters/parse_cbor.rs +++ b/src/filters/parse_cbor.rs @@ -40,7 +40,7 @@ impl From<&Stage> for Worker { gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { let output = unit.clone().try_map_record(|r| match r { - Record::CborTx(cbor) => { + Record::CborTx(_, cbor) => { let tx = trv::MultiEraTx::decode(&cbor).or_panic()?; let tx = stage.mapper.map_tx(&tx); Ok(Record::ParsedTx(tx)) diff --git a/src/filters/split_block.rs b/src/filters/split_block.rs index f15cb166..e9d1c915 100644 --- a/src/filters/split_block.rs +++ b/src/filters/split_block.rs @@ -4,6 +4,7 @@ use gasket::framework::*; use serde::Deserialize; use std::borrow::Cow; +use pallas::crypto::hash::Hash; use pallas::ledger::traverse as trv; use crate::framework::*; @@ -11,14 +12,13 @@ use crate::framework::*; type CborBlock<'a> = Cow<'a, [u8]>; type CborTx<'a> = Cow<'a, [u8]>; -fn map_block_to_tx(cbor: CborBlock) -> Result, WorkerError> { +fn map_block_to_tx(cbor: CborBlock) -> Result, CborTx)>, WorkerError> { let block = trv::MultiEraBlock::decode(cbor.as_ref()).or_panic()?; let txs: Vec<_> = block .txs() .iter() - .map(|tx| tx.encode()) - .map(Cow::Owned) + .map(|tx| (tx.hash(), Cow::Owned(tx.encode()))) .collect(); Ok(txs) @@ -48,7 +48,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { Record::CborBlock(cbor) => { let out = map_block_to_tx(Cow::Borrowed(&cbor))? .into_iter() - .map(|tx| Record::CborTx(tx.into())) + .map(|(hash, cbor)| Record::CborTx(hash, cbor.into())) .collect(); Ok(out) diff --git a/src/filters/split_tx.rs b/src/filters/split_tx.rs new file mode 100644 index 00000000..8f9cb349 --- /dev/null +++ b/src/filters/split_tx.rs @@ -0,0 +1,76 @@ +//! A noop filter used as example and placeholder for other filters + +use gasket::framework::*; +use serde::Deserialize; +use std::borrow::Cow; + +use pallas::ledger::traverse as trv; + +use crate::framework::*; + +type CborTx<'a> = Cow<'a, [u8]>; +type CborUtxo<'a> = Cow<'a, [u8]>; + +fn map_tx_to_utxo(cbor: CborTx) -> Result, Spent)>, WorkerError> { + let tx = trv::MultiEraTx::decode(cbor.as_ref()).or_panic()?; + + let utxos: Vec<_> = tx + .produces() + .iter() + .map(|(idx, utxo)| { + ( + (tx.hash(), *idx as u32), + Some(Cow::Owned(utxo.encode())), + false, + ) + }) + .collect(); + + Ok(utxos) +} + +#[derive(Default, Stage)] +#[stage(name = "filter-split-tx", unit = "ChainEvent", worker = "Worker")] +pub struct Stage { + pub input: FilterInputPort, + pub output: FilterOutputPort, + + #[metric] + ops_count: gasket::metrics::Counter, +} + +#[derive(Default)] +pub struct Worker; + +impl From<&Stage> for Worker { + fn from(_: &Stage) -> Self { + Self + } +} + +gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => { + let output = unit.clone().try_map_record_to_many(|r| match r { + Record::CborTx(_, cbor) => { + let out = map_tx_to_utxo(Cow::Borrowed(&cbor))? + .into_iter() + .map(|(txo, cbor, spent)| Record::CborUtxo(txo, cbor.map(|cbor| cbor.into()), spent)) + .collect(); + + Ok(out) + } + x => Ok(vec![x]), + })?; + + stage.ops_count.inc(1); + + output +}); + +#[derive(Default, Deserialize)] +pub struct Config {} + +impl Config { + pub fn bootstrapper(self, _ctx: &Context) -> Result { + Ok(Stage::default()) + } +} diff --git a/src/filters/wasm_plugin.rs b/src/filters/wasm_plugin.rs index 210cbd53..65b79d98 100644 --- a/src/filters/wasm_plugin.rs +++ b/src/filters/wasm_plugin.rs @@ -21,7 +21,8 @@ impl Stage { fn map_record(&mut self, r: Record) -> Result, Error> { let extism::convert::Json::(output) = match r { Record::CborBlock(x) => self.plugin.call("map_cbor_block", x).unwrap(), - Record::CborTx(x) => self.plugin.call("map_cbor_tx", x).unwrap(), + Record::CborTx(_, x) => self.plugin.call("map_cbor_tx", x).unwrap(), + Record::CborUtxo(_, x, spent) => self.plugin.call("map_cbor_utxo", x).unwrap(), Record::ParsedTx(x) => self .plugin .call("map_u5c_tx", extism::convert::Json(x)) diff --git a/src/framework/mod.rs b/src/framework/mod.rs index 2598e8d8..afb99c94 100644 --- a/src/framework/mod.rs +++ b/src/framework/mod.rs @@ -1,5 +1,6 @@ //! Internal pipeline framework +use pallas::crypto::hash::Hash; use pallas::network::miniprotocols::Point; use serde::Deserialize; use serde_json::{json, Value as JsonValue}; @@ -101,21 +102,37 @@ pub struct Context { pub breadcrumbs: Breadcrumbs, } +pub type Cbor = Vec; +pub type TxRef = Hash<32>; +pub type TxoIdx = u32; +pub type TxoRef = (TxRef, TxoIdx); +pub type Spent = bool; + #[derive(Debug, Clone)] pub enum Record { - CborBlock(Vec), - CborTx(Vec), + CborBlock(Cbor), + CborTx(TxRef, Cbor), + CborUtxo(TxoRef, Option, Spent), + ParsedBlock(ParsedBlock), + ParsedTx(ParsedTx), GenericJson(JsonValue), OuraV1Event(legacy_v1::Event), - ParsedTx(ParsedTx), - ParsedBlock(ParsedBlock), } impl From for JsonValue { fn from(value: Record) -> Self { match value { Record::CborBlock(x) => json!({ "hex": hex::encode(x) }), - Record::CborTx(x) => json!({ "hex": hex::encode(x) }), + Record::CborTx(hash, cbor) => { + json!({ "hash": hash.to_string(), "hex": hex::encode(cbor) }) + } + Record::CborUtxo((hash, idx), cbor, spent) => { + json!({ + "txo": format!("{hash}#{idx}"), + "hex": cbor.map(|x| hex::encode(x)), + "spent": spent, + }) + } Record::ParsedBlock(x) => json!(x), Record::ParsedTx(x) => json!(x), Record::OuraV1Event(x) => json!(x),