Skip to content

Commit

Permalink
feat: implement split tx filter
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega committed Sep 10, 2024
1 parent e24b023 commit 4c6c287
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 11 deletions.
7 changes: 7 additions & 0 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ pub mod noop;
pub mod parse_cbor;
pub mod select;
pub mod split_block;
pub mod split_tx;

#[cfg(feature = "wasm")]
pub mod wasm_plugin;

pub enum Bootstrapper {
Noop(noop::Stage),
SplitBlock(split_block::Stage),
SplitTx(split_tx::Stage),
IntoJson(into_json::Stage),
LegacyV1(legacy_v1::Stage),
ParseCbor(parse_cbor::Stage),
Expand All @@ -30,6 +32,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(p) => &mut p.input,
Bootstrapper::SplitBlock(p) => &mut p.input,
Bootstrapper::SplitTx(p) => &mut p.input,
Bootstrapper::IntoJson(p) => &mut p.input,
Bootstrapper::LegacyV1(p) => &mut p.input,
Bootstrapper::ParseCbor(p) => &mut p.input,
Expand All @@ -44,6 +47,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(p) => &mut p.output,
Bootstrapper::SplitBlock(p) => &mut p.output,
Bootstrapper::SplitTx(p) => &mut p.output,
Bootstrapper::IntoJson(p) => &mut p.output,
Bootstrapper::LegacyV1(p) => &mut p.output,
Bootstrapper::ParseCbor(p) => &mut p.output,
Expand All @@ -58,6 +62,7 @@ impl Bootstrapper {
match self {
Bootstrapper::Noop(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::SplitBlock(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::SplitTx(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::IntoJson(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy),
Expand All @@ -74,6 +79,7 @@ impl Bootstrapper {
pub enum Config {
Noop(noop::Config),
SplitBlock(split_block::Config),
SplitTx(split_tx::Config),
IntoJson(into_json::Config),
LegacyV1(legacy_v1::Config),
ParseCbor(parse_cbor::Config),
Expand All @@ -88,6 +94,7 @@ impl Config {
match self {
Config::Noop(c) => Ok(Bootstrapper::Noop(c.bootstrapper(ctx)?)),
Config::SplitBlock(c) => Ok(Bootstrapper::SplitBlock(c.bootstrapper(ctx)?)),
Config::SplitTx(c) => Ok(Bootstrapper::SplitTx(c.bootstrapper(ctx)?)),
Config::IntoJson(c) => Ok(Bootstrapper::IntoJson(c.bootstrapper(ctx)?)),
Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)),
Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)),
Expand Down
2 changes: 1 addition & 1 deletion src/filters/parse_cbor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl From<&Stage> for Worker {

gasket::impl_mapper!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let output = unit.clone().try_map_record(|r| match r {
Record::CborTx(cbor) => {
Record::CborTx(_, cbor) => {
let tx = trv::MultiEraTx::decode(&cbor).or_panic()?;
let tx = stage.mapper.map_tx(&tx);
Ok(Record::ParsedTx(tx))
Expand Down
8 changes: 4 additions & 4 deletions src/filters/split_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@ use gasket::framework::*;
use serde::Deserialize;
use std::borrow::Cow;

use pallas::crypto::hash::Hash;
use pallas::ledger::traverse as trv;

use crate::framework::*;

type CborBlock<'a> = Cow<'a, [u8]>;
type CborTx<'a> = Cow<'a, [u8]>;

fn map_block_to_tx(cbor: CborBlock) -> Result<Vec<CborTx>, WorkerError> {
fn map_block_to_tx(cbor: CborBlock) -> Result<Vec<(Hash<32>, CborTx)>, WorkerError> {
let block = trv::MultiEraBlock::decode(cbor.as_ref()).or_panic()?;

let txs: Vec<_> = block
.txs()
.iter()
.map(|tx| tx.encode())
.map(Cow::Owned)
.map(|tx| (tx.hash(), Cow::Owned(tx.encode())))
.collect();

Ok(txs)
Expand Down Expand Up @@ -48,7 +48,7 @@ gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
Record::CborBlock(cbor) => {
let out = map_block_to_tx(Cow::Borrowed(&cbor))?
.into_iter()
.map(|tx| Record::CborTx(tx.into()))
.map(|(hash, cbor)| Record::CborTx(hash, cbor.into()))
.collect();

Ok(out)
Expand Down
76 changes: 76 additions & 0 deletions src/filters/split_tx.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! A noop filter used as example and placeholder for other filters
use gasket::framework::*;
use serde::Deserialize;
use std::borrow::Cow;

use pallas::ledger::traverse as trv;

use crate::framework::*;

type CborTx<'a> = Cow<'a, [u8]>;
type CborUtxo<'a> = Cow<'a, [u8]>;

fn map_tx_to_utxo(cbor: CborTx) -> Result<Vec<(TxoRef, Option<CborUtxo>, Spent)>, WorkerError> {
let tx = trv::MultiEraTx::decode(cbor.as_ref()).or_panic()?;

let utxos: Vec<_> = tx
.produces()
.iter()
.map(|(idx, utxo)| {
(
(tx.hash(), *idx as u32),
Some(Cow::Owned(utxo.encode())),
false,
)
})
.collect();

Ok(utxos)
}

#[derive(Default, Stage)]
#[stage(name = "filter-split-tx", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
pub input: FilterInputPort,
pub output: FilterOutputPort,

#[metric]
ops_count: gasket::metrics::Counter,
}

#[derive(Default)]
pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Self
}
}

gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let output = unit.clone().try_map_record_to_many(|r| match r {
Record::CborTx(_, cbor) => {
let out = map_tx_to_utxo(Cow::Borrowed(&cbor))?
.into_iter()
.map(|(txo, cbor, spent)| Record::CborUtxo(txo, cbor.map(|cbor| cbor.into()), spent))
.collect();

Ok(out)
}
x => Ok(vec![x]),
})?;

stage.ops_count.inc(1);

output
});

#[derive(Default, Deserialize)]
pub struct Config {}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
Ok(Stage::default())
}
}
3 changes: 2 additions & 1 deletion src/filters/wasm_plugin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ impl Stage {
fn map_record(&mut self, r: Record) -> Result<Vec<Record>, Error> {
let extism::convert::Json::<serde_json::Value>(output) = match r {
Record::CborBlock(x) => self.plugin.call("map_cbor_block", x).unwrap(),
Record::CborTx(x) => self.plugin.call("map_cbor_tx", x).unwrap(),
Record::CborTx(_, x) => self.plugin.call("map_cbor_tx", x).unwrap(),
Record::CborUtxo(_, x, spent) => self.plugin.call("map_cbor_utxo", x).unwrap(),

Check failure on line 25 in src/filters/wasm_plugin.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

unused variable: `spent`

Check warning on line 25 in src/filters/wasm_plugin.rs

View workflow job for this annotation

GitHub Actions / Check (windows-latest, stable)

unused variable: `spent`

Check warning on line 25 in src/filters/wasm_plugin.rs

View workflow job for this annotation

GitHub Actions / Test Suite

unused variable: `spent`

Check warning on line 25 in src/filters/wasm_plugin.rs

View workflow job for this annotation

GitHub Actions / Check (ubuntu-latest, stable)

unused variable: `spent`
Record::ParsedTx(x) => self
.plugin
.call("map_u5c_tx", extism::convert::Json(x))
Expand Down
27 changes: 22 additions & 5 deletions src/framework/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Internal pipeline framework
use pallas::crypto::hash::Hash;
use pallas::network::miniprotocols::Point;
use serde::Deserialize;
use serde_json::{json, Value as JsonValue};
Expand Down Expand Up @@ -101,21 +102,37 @@ pub struct Context {
pub breadcrumbs: Breadcrumbs,
}

pub type Cbor = Vec<u8>;
pub type TxRef = Hash<32>;
pub type TxoIdx = u32;
pub type TxoRef = (TxRef, TxoIdx);
pub type Spent = bool;

#[derive(Debug, Clone)]
pub enum Record {
CborBlock(Vec<u8>),
CborTx(Vec<u8>),
CborBlock(Cbor),
CborTx(TxRef, Cbor),
CborUtxo(TxoRef, Option<Cbor>, Spent),
ParsedBlock(ParsedBlock),
ParsedTx(ParsedTx),
GenericJson(JsonValue),
OuraV1Event(legacy_v1::Event),
ParsedTx(ParsedTx),
ParsedBlock(ParsedBlock),
}

impl From<Record> for JsonValue {
fn from(value: Record) -> Self {
match value {
Record::CborBlock(x) => json!({ "hex": hex::encode(x) }),
Record::CborTx(x) => json!({ "hex": hex::encode(x) }),
Record::CborTx(hash, cbor) => {
json!({ "hash": hash.to_string(), "hex": hex::encode(cbor) })
}
Record::CborUtxo((hash, idx), cbor, spent) => {
json!({
"txo": format!("{hash}#{idx}"),
"hex": cbor.map(|x| hex::encode(x)),

Check failure on line 132 in src/framework/mod.rs

View workflow job for this annotation

GitHub Actions / Lint Rust

redundant closure
"spent": spent,
})
}
Record::ParsedBlock(x) => json!(x),
Record::ParsedTx(x) => json!(x),
Record::OuraV1Event(x) => json!(x),
Expand Down

0 comments on commit 4c6c287

Please sign in to comment.